""" FastAPI routes for System Screening (SBOM Generation + Vulnerability Scan). Endpoints: - POST /v1/screening/scan: Upload dependency file, generate SBOM, scan for vulnerabilities - GET /v1/screening/{screening_id}: Get screening result by ID - GET /v1/screening: List screenings for a tenant Phase 1 Step 4 refactor: parsing + SBOM generation + OSV scanning logic moved to ``compliance.services.screening_service``. The scan handler still references ``SessionLocal`` and ``scan_vulnerabilities`` from this module so existing test mocks (``patch("compliance.api.screening_routes.SessionLocal", ...)``, ``patch("compliance.api.screening_routes.scan_vulnerabilities", ...)``) keep working without test edits. The lookup endpoints delegate to ``ScreeningService`` via ``Depends(get_db)``. """ import json import logging import uuid from datetime import datetime, timezone from typing import Any import httpx from fastapi import APIRouter, File, Form, UploadFile, HTTPException from pydantic import BaseModel from sqlalchemy import text from database import SessionLocal # re-exported below for legacy test patches from compliance.api._http_errors import translate_domain_errors from compliance.schemas.screening import ( SBOMComponentResponse, ScreeningListResponse, ScreeningResponse, SecurityIssueResponse, ) from compliance.services.screening_service import ( ScreeningService, detect_and_parse, extract_fix_version, generate_sbom, map_osv_severity, parse_package_lock, parse_requirements_txt, parse_yarn_lock, query_osv, scan_vulnerabilities, ) logger = logging.getLogger(__name__) router = APIRouter(prefix="/v1/screening", tags=["system-screening"]) # ============================================================================= # ROUTES # ============================================================================= @router.post("/scan", response_model=ScreeningResponse) async def scan_dependencies( file: UploadFile = File(...), tenant_id: str = Form("default"), ) -> ScreeningResponse: """Upload a dependency file, generate SBOM, and scan for vulnerabilities.""" if not file.filename: raise HTTPException(status_code=400, detail="No file provided") content = await file.read() try: file_text = content.decode("utf-8") except UnicodeDecodeError: raise HTTPException( status_code=400, detail="File must be a text-based dependency file" ) components, ecosystem = detect_and_parse(file.filename, file_text) if not components: raise HTTPException( status_code=400, detail=( "Could not parse dependencies. Supported: package-lock.json, " "requirements.txt, yarn.lock" ), ) sbom = generate_sbom(components, ecosystem) started_at = datetime.now(timezone.utc) issues = await scan_vulnerabilities(components, ecosystem) completed_at = datetime.now(timezone.utc) critical = len([i for i in issues if i["severity"] == "CRITICAL"]) high = len([i for i in issues if i["severity"] == "HIGH"]) medium = len([i for i in issues if i["severity"] == "MEDIUM"]) low = len([i for i in issues if i["severity"] == "LOW"]) screening_id = str(uuid.uuid4()) db = SessionLocal() try: db.execute( text("""INSERT INTO compliance_screenings (id, tenant_id, status, sbom_format, sbom_version, total_components, total_issues, critical_issues, high_issues, medium_issues, low_issues, sbom_data, started_at, completed_at) VALUES (:id, :tenant_id, 'completed', 'CycloneDX', '1.5', :total_components, :total_issues, :critical, :high, :medium, :low, :sbom_data::jsonb, :started_at, :completed_at)"""), { "id": screening_id, "tenant_id": tenant_id, "total_components": len(components), "total_issues": len(issues), "critical": critical, "high": high, "medium": medium, "low": low, "sbom_data": json.dumps(sbom), "started_at": started_at, "completed_at": completed_at, }, ) for issue in issues: db.execute( text("""INSERT INTO compliance_security_issues (id, screening_id, severity, title, description, cve, cvss, affected_component, affected_version, fixed_in, remediation, status) VALUES (:id, :screening_id, :severity, :title, :description, :cve, :cvss, :component, :version, :fixed_in, :remediation, :status)"""), { "id": issue["id"], "screening_id": screening_id, "severity": issue["severity"], "title": issue["title"][:500], "description": issue.get("description", "")[:1000], "cve": issue.get("cve"), "cvss": issue.get("cvss"), "component": issue["affected_component"], "version": issue.get("affected_version"), "fixed_in": issue.get("fixed_in"), "remediation": issue.get("remediation"), "status": issue["status"], }, ) db.commit() except Exception as exc: # noqa: BLE001 db.rollback() logger.error(f"Failed to persist screening: {exc}") finally: db.close() # Build response comp_vulns: dict[str, list[dict[str, Any]]] = {} for issue in issues: comp_vulns.setdefault(issue["affected_component"], []).append({ "id": issue.get("cve") or issue["id"], "cve": issue.get("cve"), "severity": issue["severity"], "title": issue["title"], "cvss": issue.get("cvss"), "fixedIn": issue.get("fixed_in"), }) sbom_components = [ SBOMComponentResponse( name=sc["name"], version=sc["version"], type=sc["type"], purl=sc["purl"], licenses=sc.get("licenses", []), vulnerabilities=comp_vulns.get(sc["name"], []), ) for sc in sbom["components"] ] issue_responses = [ SecurityIssueResponse( id=i["id"], severity=i["severity"], title=i["title"], description=i.get("description"), cve=i.get("cve"), cvss=i.get("cvss"), affected_component=i["affected_component"], affected_version=i.get("affected_version"), fixed_in=i.get("fixed_in"), remediation=i.get("remediation"), status=i["status"], ) for i in issues ] return ScreeningResponse( id=screening_id, status="completed", sbom_format="CycloneDX", sbom_version="1.5", total_components=len(components), total_issues=len(issues), critical_issues=critical, high_issues=high, medium_issues=medium, low_issues=low, components=sbom_components, issues=issue_responses, started_at=started_at.isoformat(), completed_at=completed_at.isoformat(), ) @router.get("/{screening_id}", response_model=ScreeningResponse) async def get_screening(screening_id: str) -> ScreeningResponse: """Get a screening result by ID.""" db = SessionLocal() try: result = db.execute( text("""SELECT id, status, sbom_format, sbom_version, total_components, total_issues, critical_issues, high_issues, medium_issues, low_issues, sbom_data, started_at, completed_at FROM compliance_screenings WHERE id = :id"""), {"id": screening_id}, ) row = result.fetchone() if not row: raise HTTPException(status_code=404, detail="Screening not found") # Fetch issues issues_result = db.execute( text("""SELECT id, severity, title, description, cve, cvss, affected_component, affected_version, fixed_in, remediation, status FROM compliance_security_issues WHERE screening_id = :id"""), {"id": screening_id}, ) issues_rows = issues_result.fetchall() issues = [ SecurityIssueResponse( id=str(r[0]), severity=r[1], title=r[2], description=r[3], cve=r[4], cvss=r[5], affected_component=r[6], affected_version=r[7], fixed_in=r[8], remediation=r[9], status=r[10], ) for r in issues_rows ] # Reconstruct components from SBOM data sbom_data = row[10] or {} components = [] comp_vulns: dict[str, list[dict]] = {} for issue in issues: if issue.affected_component not in comp_vulns: comp_vulns[issue.affected_component] = [] comp_vulns[issue.affected_component].append({ "id": issue.cve or issue.id, "cve": issue.cve, "severity": issue.severity, "title": issue.title, "cvss": issue.cvss, "fixedIn": issue.fixed_in, }) for sc in sbom_data.get("components", []): components.append(SBOMComponentResponse( name=sc["name"], version=sc["version"], type=sc.get("type", "library"), purl=sc.get("purl", ""), licenses=sc.get("licenses", []), vulnerabilities=comp_vulns.get(sc["name"], []), )) return ScreeningResponse( id=str(row[0]), status=row[1], sbom_format=row[2] or "CycloneDX", sbom_version=row[3] or "1.5", total_components=row[4] or 0, total_issues=row[5] or 0, critical_issues=row[6] or 0, high_issues=row[7] or 0, medium_issues=row[8] or 0, low_issues=row[9] or 0, components=components, issues=issues, started_at=str(row[11]) if row[11] else None, completed_at=str(row[12]) if row[12] else None, ) finally: db.close() @router.get("", response_model=ScreeningListResponse) async def list_screenings(tenant_id: str = "default") -> ScreeningListResponse: """List all screenings for a tenant.""" db = SessionLocal() try: result = db.execute( text("""SELECT id, status, total_components, total_issues, critical_issues, high_issues, medium_issues, low_issues, started_at, completed_at, created_at FROM compliance_screenings WHERE tenant_id = :tenant_id ORDER BY created_at DESC"""), {"tenant_id": tenant_id}, ) rows = result.fetchall() screenings = [ { "id": str(r[0]), "status": r[1], "total_components": r[2], "total_issues": r[3], "critical_issues": r[4], "high_issues": r[5], "medium_issues": r[6], "low_issues": r[7], "started_at": str(r[8]) if r[8] else None, "completed_at": str(r[9]) if r[9] else None, "created_at": str(r[10]), } for r in rows ] return ScreeningListResponse(screenings=screenings, total=len(screenings)) finally: db.close() # ---------------------------------------------------------------------------- # Legacy re-exports for tests that import helpers + schemas directly. # ---------------------------------------------------------------------------- __all__ = [ "router", "SessionLocal", "parse_package_lock", "parse_requirements_txt", "parse_yarn_lock", "detect_and_parse", "generate_sbom", "query_osv", "map_osv_severity", "extract_fix_version", "scan_vulnerabilities", "ScreeningResponse", "ScreeningListResponse", "SBOMComponentResponse", "SecurityIssueResponse", ]