diff --git a/backend-compliance/compliance/api/screening_routes.py b/backend-compliance/compliance/api/screening_routes.py index 9b9ee16..5c8ae30 100644 --- a/backend-compliance/compliance/api/screening_routes.py +++ b/backend-compliance/compliance/api/screening_routes.py @@ -5,321 +5,50 @@ Endpoints: - POST /v1/screening/scan: Upload dependency file, generate SBOM, scan for vulnerabilities - GET /v1/screening/{screening_id}: Get screening result by ID - GET /v1/screening: List screenings for a tenant + +Phase 1 Step 4 refactor: parsing + SBOM generation + OSV scanning logic +moved to ``compliance.services.screening_service``. The scan handler still +references ``SessionLocal`` and ``scan_vulnerabilities`` from this module +so existing test mocks +(``patch("compliance.api.screening_routes.SessionLocal", ...)``, +``patch("compliance.api.screening_routes.scan_vulnerabilities", ...)``) +keep working without test edits. The lookup endpoints delegate to +``ScreeningService`` via ``Depends(get_db)``. """ import json import logging -import re import uuid from datetime import datetime, timezone -from typing import Optional +from typing import Any -import httpx -from fastapi import APIRouter, File, Form, UploadFile, HTTPException -from pydantic import BaseModel +from fastapi import APIRouter, File, Form, HTTPException, UploadFile from sqlalchemy import text -from database import SessionLocal +from database import SessionLocal # re-exported below for legacy test patches +from compliance.api._http_errors import translate_domain_errors +from compliance.schemas.screening import ( + SBOMComponentResponse, + ScreeningListResponse, + ScreeningResponse, + SecurityIssueResponse, +) +from compliance.services.screening_service import ( + ScreeningService, + detect_and_parse, + extract_fix_version, + generate_sbom, + map_osv_severity, + parse_package_lock, + parse_requirements_txt, + parse_yarn_lock, + query_osv, + scan_vulnerabilities, +) logger = logging.getLogger(__name__) router = APIRouter(prefix="/v1/screening", tags=["system-screening"]) -OSV_API_URL = "https://api.osv.dev/v1/query" - - -# ============================================================================= -# RESPONSE MODELS -# ============================================================================= - -class SecurityIssueResponse(BaseModel): - id: str - severity: str - title: str - description: Optional[str] = None - cve: Optional[str] = None - cvss: Optional[float] = None - affected_component: str - affected_version: Optional[str] = None - fixed_in: Optional[str] = None - remediation: Optional[str] = None - status: str = "OPEN" - - -class SBOMComponentResponse(BaseModel): - name: str - version: str - type: str - purl: str - licenses: list[str] - vulnerabilities: list[dict] - - -class ScreeningResponse(BaseModel): - id: str - status: str - sbom_format: str - sbom_version: str - total_components: int - total_issues: int - critical_issues: int - high_issues: int - medium_issues: int - low_issues: int - components: list[SBOMComponentResponse] - issues: list[SecurityIssueResponse] - started_at: Optional[str] = None - completed_at: Optional[str] = None - - -class ScreeningListResponse(BaseModel): - screenings: list[dict] - total: int - - -# ============================================================================= -# DEPENDENCY PARSING -# ============================================================================= - -def parse_package_lock(content: str) -> list[dict]: - """Parse package-lock.json and extract dependencies.""" - try: - data = json.loads(content) - except json.JSONDecodeError: - return [] - - components = [] - - # package-lock.json v2/v3 format (packages field) - packages = data.get("packages", {}) - if packages: - for path, info in packages.items(): - if not path: # Skip root - continue - name = path.split("node_modules/")[-1] if "node_modules/" in path else path - version = info.get("version", "unknown") - if name and version != "unknown": - components.append({ - "name": name, - "version": version, - "type": "library", - "ecosystem": "npm", - "license": info.get("license", "unknown"), - }) - - # Fallback: v1 format (dependencies field) - if not components: - dependencies = data.get("dependencies", {}) - for name, info in dependencies.items(): - if isinstance(info, dict): - components.append({ - "name": name, - "version": info.get("version", "unknown"), - "type": "library", - "ecosystem": "npm", - "license": "unknown", - }) - - return components - - -def parse_requirements_txt(content: str) -> list[dict]: - """Parse requirements.txt and extract dependencies.""" - components = [] - for line in content.strip().split("\n"): - line = line.strip() - if not line or line.startswith("#") or line.startswith("-"): - continue - - # Match patterns: package==version, package>=version, package~=version - match = re.match(r'^([a-zA-Z0-9_.-]+)\s*([>=<~!]+)\s*([a-zA-Z0-9_.*-]+)', line) - if match: - components.append({ - "name": match.group(1), - "version": match.group(3), - "type": "library", - "ecosystem": "PyPI", - "license": "unknown", - }) - elif re.match(r'^[a-zA-Z0-9_.-]+$', line): - components.append({ - "name": line, - "version": "latest", - "type": "library", - "ecosystem": "PyPI", - "license": "unknown", - }) - - return components - - -def parse_yarn_lock(content: str) -> list[dict]: - """Parse yarn.lock and extract dependencies (basic).""" - components = [] - current_name = None - for line in content.split("\n"): - # Match: "package@version": - match = re.match(r'^"?([^@]+)@[^"]*"?:', line) - if match: - current_name = match.group(1).strip() - elif current_name and line.strip().startswith("version "): - version_match = re.match(r'\s+version\s+"?([^"]+)"?', line) - if version_match: - components.append({ - "name": current_name, - "version": version_match.group(1), - "type": "library", - "ecosystem": "npm", - "license": "unknown", - }) - current_name = None - - return components - - -def detect_and_parse(filename: str, content: str) -> tuple[list[dict], str]: - """Detect file type and parse accordingly.""" - fname = filename.lower() - - if "package-lock" in fname or fname.endswith("package-lock.json"): - return parse_package_lock(content), "npm" - elif fname == "requirements.txt" or fname.endswith("/requirements.txt"): - return parse_requirements_txt(content), "PyPI" - elif "yarn.lock" in fname: - return parse_yarn_lock(content), "npm" - elif fname.endswith(".json"): - # Try package-lock format - comps = parse_package_lock(content) - if comps: - return comps, "npm" - - # Fallback: try requirements.txt format - comps = parse_requirements_txt(content) - if comps: - return comps, "PyPI" - - return [], "unknown" - - -# ============================================================================= -# SBOM GENERATION (CycloneDX format) -# ============================================================================= - -def generate_sbom(components: list[dict], ecosystem: str) -> dict: - """Generate a CycloneDX 1.5 SBOM from parsed components.""" - sbom_components = [] - for comp in components: - purl = f"pkg:{ecosystem.lower()}/{comp['name']}@{comp['version']}" - sbom_components.append({ - "type": "library", - "name": comp["name"], - "version": comp["version"], - "purl": purl, - "licenses": [comp.get("license", "unknown")] if comp.get("license") != "unknown" else [], - }) - - return { - "bomFormat": "CycloneDX", - "specVersion": "1.5", - "version": 1, - "metadata": { - "timestamp": datetime.now(timezone.utc).isoformat(), - "tools": [{"name": "breakpilot-screening", "version": "1.0.0"}], - }, - "components": sbom_components, - } - - -# ============================================================================= -# VULNERABILITY SCANNING (OSV.dev API) -# ============================================================================= - -async def query_osv(name: str, version: str, ecosystem: str) -> list[dict]: - """Query OSV.dev API for vulnerabilities of a single package.""" - try: - async with httpx.AsyncClient(timeout=10.0) as client: - response = await client.post( - OSV_API_URL, - json={ - "package": {"name": name, "ecosystem": ecosystem}, - "version": version, - }, - ) - if response.status_code == 200: - data = response.json() - return data.get("vulns", []) - except Exception as e: - logger.warning(f"OSV query failed for {name}@{version}: {e}") - - return [] - - -def map_osv_severity(vuln: dict) -> tuple[str, float]: - """Extract severity and CVSS from OSV vulnerability data.""" - severity = "MEDIUM" - cvss = 5.0 - - # Check database_specific for severity - db_specific = vuln.get("database_specific", {}) - if "severity" in db_specific: - sev_str = db_specific["severity"].upper() - if sev_str in ("CRITICAL", "HIGH", "MEDIUM", "LOW"): - severity = sev_str - - # Derive CVSS from severity if not found - cvss_map = {"CRITICAL": 9.5, "HIGH": 7.5, "MEDIUM": 5.0, "LOW": 2.5} - cvss = cvss_map.get(severity, 5.0) - - return severity, cvss - - -def extract_fix_version(vuln: dict, package_name: str) -> Optional[str]: - """Extract the fixed-in version from OSV data.""" - for affected in vuln.get("affected", []): - pkg = affected.get("package", {}) - if pkg.get("name", "").lower() == package_name.lower(): - for rng in affected.get("ranges", []): - for event in rng.get("events", []): - if "fixed" in event: - return event["fixed"] - return None - - -async def scan_vulnerabilities(components: list[dict], ecosystem: str) -> list[dict]: - """Scan all components for vulnerabilities via OSV.dev.""" - issues = [] - - # Batch: scan up to 50 components to avoid timeouts - scan_limit = min(len(components), 50) - - for comp in components[:scan_limit]: - if comp["version"] in ("latest", "unknown", "*"): - continue - - vulns = await query_osv(comp["name"], comp["version"], ecosystem) - - for vuln in vulns: - vuln_id = vuln.get("id", f"OSV-{uuid.uuid4().hex[:8]}") - aliases = vuln.get("aliases", []) - cve = next((a for a in aliases if a.startswith("CVE-")), None) - severity, cvss = map_osv_severity(vuln) - fixed_in = extract_fix_version(vuln, comp["name"]) - - issues.append({ - "id": str(uuid.uuid4()), - "severity": severity, - "title": vuln.get("summary", vuln_id), - "description": vuln.get("details", "")[:500], - "cve": cve, - "cvss": cvss, - "affected_component": comp["name"], - "affected_version": comp["version"], - "fixed_in": fixed_in, - "remediation": f"Upgrade {comp['name']} to {fixed_in}" if fixed_in else f"Check {vuln_id} for remediation steps", - "status": "OPEN", - }) - - return issues - # ============================================================================= # ROUTES @@ -329,51 +58,53 @@ async def scan_vulnerabilities(components: list[dict], ecosystem: str) -> list[d async def scan_dependencies( file: UploadFile = File(...), tenant_id: str = Form("default"), -): +) -> ScreeningResponse: """Upload a dependency file, generate SBOM, and scan for vulnerabilities.""" if not file.filename: raise HTTPException(status_code=400, detail="No file provided") content = await file.read() try: - text = content.decode("utf-8") + file_text = content.decode("utf-8") except UnicodeDecodeError: - raise HTTPException(status_code=400, detail="File must be a text-based dependency file") + raise HTTPException( + status_code=400, detail="File must be a text-based dependency file" + ) - # Parse dependencies - components, ecosystem = detect_and_parse(file.filename, text) + components, ecosystem = detect_and_parse(file.filename, file_text) if not components: raise HTTPException( status_code=400, - detail="Could not parse dependencies. Supported: package-lock.json, requirements.txt, yarn.lock", + detail=( + "Could not parse dependencies. Supported: package-lock.json, " + "requirements.txt, yarn.lock" + ), ) - # Generate SBOM sbom = generate_sbom(components, ecosystem) - # Scan for vulnerabilities started_at = datetime.now(timezone.utc) issues = await scan_vulnerabilities(components, ecosystem) completed_at = datetime.now(timezone.utc) - # Count severities critical = len([i for i in issues if i["severity"] == "CRITICAL"]) high = len([i for i in issues if i["severity"] == "HIGH"]) medium = len([i for i in issues if i["severity"] == "MEDIUM"]) low = len([i for i in issues if i["severity"] == "LOW"]) - # Persist to database screening_id = str(uuid.uuid4()) db = SessionLocal() try: db.execute( - text("""INSERT INTO compliance_screenings - (id, tenant_id, status, sbom_format, sbom_version, - total_components, total_issues, critical_issues, high_issues, medium_issues, low_issues, - sbom_data, started_at, completed_at) - VALUES (:id, :tenant_id, 'completed', 'CycloneDX', '1.5', - :total_components, :total_issues, :critical, :high, :medium, :low, - :sbom_data::jsonb, :started_at, :completed_at)"""), + text( + "INSERT INTO compliance_screenings " + "(id, tenant_id, status, sbom_format, sbom_version, " + "total_components, total_issues, critical_issues, high_issues, " + "medium_issues, low_issues, sbom_data, started_at, completed_at) " + "VALUES (:id, :tenant_id, 'completed', 'CycloneDX', '1.5', " + ":total_components, :total_issues, :critical, :high, :medium, :low, " + ":sbom_data::jsonb, :started_at, :completed_at)" + ), { "id": screening_id, "tenant_id": tenant_id, @@ -388,15 +119,15 @@ async def scan_dependencies( "completed_at": completed_at, }, ) - - # Persist security issues for issue in issues: db.execute( - text("""INSERT INTO compliance_security_issues - (id, screening_id, severity, title, description, cve, cvss, - affected_component, affected_version, fixed_in, remediation, status) - VALUES (:id, :screening_id, :severity, :title, :description, :cve, :cvss, - :component, :version, :fixed_in, :remediation, :status)"""), + text( + "INSERT INTO compliance_security_issues " + "(id, screening_id, severity, title, description, cve, cvss, " + "affected_component, affected_version, fixed_in, remediation, status) " + "VALUES (:id, :screening_id, :severity, :title, :description, :cve, :cvss, " + ":component, :version, :fixed_in, :remediation, :status)" + ), { "id": issue["id"], "screening_id": screening_id, @@ -412,22 +143,17 @@ async def scan_dependencies( "status": issue["status"], }, ) - db.commit() - except Exception as e: + except Exception as exc: # noqa: BLE001 db.rollback() - logger.error(f"Failed to persist screening: {e}") + logger.error(f"Failed to persist screening: {exc}") finally: db.close() # Build response - sbom_components = [] - comp_vulns: dict[str, list[dict]] = {} + comp_vulns: dict[str, list[dict[str, Any]]] = {} for issue in issues: - comp_name = issue["affected_component"] - if comp_name not in comp_vulns: - comp_vulns[comp_name] = [] - comp_vulns[comp_name].append({ + comp_vulns.setdefault(issue["affected_component"], []).append({ "id": issue.get("cve") or issue["id"], "cve": issue.get("cve"), "severity": issue["severity"], @@ -436,15 +162,17 @@ async def scan_dependencies( "fixedIn": issue.get("fixed_in"), }) - for sc in sbom["components"]: - sbom_components.append(SBOMComponentResponse( + sbom_components = [ + SBOMComponentResponse( name=sc["name"], version=sc["version"], type=sc["type"], purl=sc["purl"], licenses=sc.get("licenses", []), vulnerabilities=comp_vulns.get(sc["name"], []), - )) + ) + for sc in sbom["components"] + ] issue_responses = [ SecurityIssueResponse( @@ -482,116 +210,45 @@ async def scan_dependencies( @router.get("/{screening_id}", response_model=ScreeningResponse) -async def get_screening(screening_id: str): +async def get_screening(screening_id: str) -> ScreeningResponse: """Get a screening result by ID.""" db = SessionLocal() try: - result = db.execute( - text("""SELECT id, status, sbom_format, sbom_version, - total_components, total_issues, critical_issues, high_issues, - medium_issues, low_issues, sbom_data, started_at, completed_at - FROM compliance_screenings WHERE id = :id"""), - {"id": screening_id}, - ) - row = result.fetchone() - if not row: - raise HTTPException(status_code=404, detail="Screening not found") - - # Fetch issues - issues_result = db.execute( - text("""SELECT id, severity, title, description, cve, cvss, - affected_component, affected_version, fixed_in, remediation, status - FROM compliance_security_issues WHERE screening_id = :id"""), - {"id": screening_id}, - ) - issues_rows = issues_result.fetchall() - - issues = [ - SecurityIssueResponse( - id=str(r[0]), severity=r[1], title=r[2], description=r[3], - cve=r[4], cvss=r[5], affected_component=r[6], - affected_version=r[7], fixed_in=r[8], remediation=r[9], status=r[10], - ) - for r in issues_rows - ] - - # Reconstruct components from SBOM data - sbom_data = row[10] or {} - components = [] - comp_vulns: dict[str, list[dict]] = {} - for issue in issues: - if issue.affected_component not in comp_vulns: - comp_vulns[issue.affected_component] = [] - comp_vulns[issue.affected_component].append({ - "id": issue.cve or issue.id, - "cve": issue.cve, - "severity": issue.severity, - "title": issue.title, - "cvss": issue.cvss, - "fixedIn": issue.fixed_in, - }) - - for sc in sbom_data.get("components", []): - components.append(SBOMComponentResponse( - name=sc["name"], - version=sc["version"], - type=sc.get("type", "library"), - purl=sc.get("purl", ""), - licenses=sc.get("licenses", []), - vulnerabilities=comp_vulns.get(sc["name"], []), - )) - - return ScreeningResponse( - id=str(row[0]), - status=row[1], - sbom_format=row[2] or "CycloneDX", - sbom_version=row[3] or "1.5", - total_components=row[4] or 0, - total_issues=row[5] or 0, - critical_issues=row[6] or 0, - high_issues=row[7] or 0, - medium_issues=row[8] or 0, - low_issues=row[9] or 0, - components=components, - issues=issues, - started_at=str(row[11]) if row[11] else None, - completed_at=str(row[12]) if row[12] else None, - ) + with translate_domain_errors(): + return ScreeningService(db).get_screening(screening_id) finally: db.close() @router.get("", response_model=ScreeningListResponse) -async def list_screenings(tenant_id: str = "default"): +async def list_screenings(tenant_id: str = "default") -> ScreeningListResponse: """List all screenings for a tenant.""" db = SessionLocal() try: - result = db.execute( - text("""SELECT id, status, total_components, total_issues, - critical_issues, high_issues, medium_issues, low_issues, - started_at, completed_at, created_at - FROM compliance_screenings - WHERE tenant_id = :tenant_id - ORDER BY created_at DESC"""), - {"tenant_id": tenant_id}, - ) - rows = result.fetchall() - screenings = [ - { - "id": str(r[0]), - "status": r[1], - "total_components": r[2], - "total_issues": r[3], - "critical_issues": r[4], - "high_issues": r[5], - "medium_issues": r[6], - "low_issues": r[7], - "started_at": str(r[8]) if r[8] else None, - "completed_at": str(r[9]) if r[9] else None, - "created_at": str(r[10]), - } - for r in rows - ] - return ScreeningListResponse(screenings=screenings, total=len(screenings)) + with translate_domain_errors(): + return ScreeningService(db).list_screenings(tenant_id) finally: db.close() + + +# ---------------------------------------------------------------------------- +# Legacy re-exports for tests that import helpers + schemas directly. +# ---------------------------------------------------------------------------- + +__all__ = [ + "router", + "SessionLocal", + "parse_package_lock", + "parse_requirements_txt", + "parse_yarn_lock", + "detect_and_parse", + "generate_sbom", + "query_osv", + "map_osv_severity", + "extract_fix_version", + "scan_vulnerabilities", + "ScreeningResponse", + "ScreeningListResponse", + "SBOMComponentResponse", + "SecurityIssueResponse", +] diff --git a/backend-compliance/compliance/schemas/screening.py b/backend-compliance/compliance/schemas/screening.py new file mode 100644 index 0000000..5b550a7 --- /dev/null +++ b/backend-compliance/compliance/schemas/screening.py @@ -0,0 +1,62 @@ +""" +System Screening schemas — SBOM + vulnerability scan results. + +Phase 1 Step 4: extracted from ``compliance.api.screening_routes``. +""" + +from typing import Any, Optional + +from pydantic import BaseModel + + +class SecurityIssueResponse(BaseModel): + id: str + severity: str + title: str + description: Optional[str] = None + cve: Optional[str] = None + cvss: Optional[float] = None + affected_component: str + affected_version: Optional[str] = None + fixed_in: Optional[str] = None + remediation: Optional[str] = None + status: str = "OPEN" + + +class SBOMComponentResponse(BaseModel): + name: str + version: str + type: str + purl: str + licenses: list[str] + vulnerabilities: list[dict[str, Any]] + + +class ScreeningResponse(BaseModel): + id: str + status: str + sbom_format: str + sbom_version: str + total_components: int + total_issues: int + critical_issues: int + high_issues: int + medium_issues: int + low_issues: int + components: list[SBOMComponentResponse] + issues: list[SecurityIssueResponse] + started_at: Optional[str] = None + completed_at: Optional[str] = None + + +class ScreeningListResponse(BaseModel): + screenings: list[dict[str, Any]] + total: int + + +__all__ = [ + "SecurityIssueResponse", + "SBOMComponentResponse", + "ScreeningResponse", + "ScreeningListResponse", +] diff --git a/backend-compliance/compliance/services/screening_service.py b/backend-compliance/compliance/services/screening_service.py new file mode 100644 index 0000000..995b48f --- /dev/null +++ b/backend-compliance/compliance/services/screening_service.py @@ -0,0 +1,384 @@ +# mypy: disable-error-code="arg-type,assignment,union-attr,no-any-return" +""" +System screening service — SBOM generation + OSV vulnerability scan. + +Phase 1 Step 4: pure parsing/SBOM/OSV helpers extracted from +``compliance.api.screening_routes``. Persistence and the streaming scan +handler stay in the route module so existing test mocks +(``patch("compliance.api.screening_routes.SessionLocal", ...)``, +``patch("compliance.api.screening_routes.scan_vulnerabilities", ...)``) +keep working without test edits. + +The screening_routes module re-exports these helpers so the legacy +import path ``from compliance.api.screening_routes import parse_package_lock`` +continues to work. +""" + +import json +import logging +import re +import uuid +from typing import Any, Optional + +import httpx +from sqlalchemy import text +from sqlalchemy.orm import Session + +from compliance.domain import NotFoundError +from compliance.schemas.screening import ( + ScreeningListResponse, + ScreeningResponse, + SBOMComponentResponse, + SecurityIssueResponse, +) + +logger = logging.getLogger(__name__) +OSV_API_URL = "https://api.osv.dev/v1/query" + + +# ============================================================================ +# Dependency parsing +# ============================================================================ + + +def parse_package_lock(content: str) -> list[dict[str, Any]]: + """Parse package-lock.json and extract dependencies.""" + try: + data = json.loads(content) + except json.JSONDecodeError: + return [] + + components: list[dict[str, Any]] = [] + packages = data.get("packages", {}) + if packages: + for path, info in packages.items(): + if not path: # skip root + continue + name = ( + path.split("node_modules/")[-1] if "node_modules/" in path else path + ) + version = info.get("version", "unknown") + if name and version != "unknown": + components.append({ + "name": name, + "version": version, + "type": "library", + "ecosystem": "npm", + "license": info.get("license", "unknown"), + }) + + if not components: + # Fallback: v1 format (dependencies field) + for name, info in data.get("dependencies", {}).items(): + if isinstance(info, dict): + components.append({ + "name": name, + "version": info.get("version", "unknown"), + "type": "library", + "ecosystem": "npm", + "license": "unknown", + }) + + return components + + +def parse_requirements_txt(content: str) -> list[dict[str, Any]]: + """Parse requirements.txt and extract dependencies.""" + components: list[dict[str, Any]] = [] + for line in content.strip().split("\n"): + line = line.strip() + if not line or line.startswith("#") or line.startswith("-"): + continue + match = re.match( + r"^([a-zA-Z0-9_.-]+)\s*([>=<~!]+)\s*([a-zA-Z0-9_.*-]+)", line + ) + if match: + components.append({ + "name": match.group(1), + "version": match.group(3), + "type": "library", + "ecosystem": "PyPI", + "license": "unknown", + }) + elif re.match(r"^[a-zA-Z0-9_.-]+$", line): + components.append({ + "name": line, + "version": "latest", + "type": "library", + "ecosystem": "PyPI", + "license": "unknown", + }) + return components + + +def parse_yarn_lock(content: str) -> list[dict[str, Any]]: + """Parse yarn.lock and extract dependencies (basic).""" + components: list[dict[str, Any]] = [] + current_name: Optional[str] = None + for line in content.split("\n"): + match = re.match(r'^"?([^@]+)@[^"]*"?:', line) + if match: + current_name = match.group(1).strip() + elif current_name and line.strip().startswith("version "): + version_match = re.match(r'\s+version\s+"?([^"]+)"?', line) + if version_match: + components.append({ + "name": current_name, + "version": version_match.group(1), + "type": "library", + "ecosystem": "npm", + "license": "unknown", + }) + current_name = None + return components + + +def detect_and_parse(filename: str, content: str) -> tuple[list[dict[str, Any]], str]: + """Detect file type and parse accordingly.""" + fname = filename.lower() + if "package-lock" in fname or fname.endswith("package-lock.json"): + return parse_package_lock(content), "npm" + if fname == "requirements.txt" or fname.endswith("/requirements.txt"): + return parse_requirements_txt(content), "PyPI" + if "yarn.lock" in fname: + return parse_yarn_lock(content), "npm" + if fname.endswith(".json"): + comps = parse_package_lock(content) + if comps: + return comps, "npm" + + comps = parse_requirements_txt(content) + if comps: + return comps, "PyPI" + return [], "unknown" + + +# ============================================================================ +# SBOM generation (CycloneDX) +# ============================================================================ + + +def generate_sbom(components: list[dict[str, Any]], ecosystem: str) -> dict[str, Any]: + """Generate a CycloneDX 1.5 SBOM from parsed components.""" + from datetime import datetime, timezone + + sbom_components = [] + for comp in components: + purl = f"pkg:{ecosystem.lower()}/{comp['name']}@{comp['version']}" + sbom_components.append({ + "type": "library", + "name": comp["name"], + "version": comp["version"], + "purl": purl, + "licenses": ( + [comp.get("license", "unknown")] + if comp.get("license") != "unknown" + else [] + ), + }) + return { + "bomFormat": "CycloneDX", + "specVersion": "1.5", + "version": 1, + "metadata": { + "timestamp": datetime.now(timezone.utc).isoformat(), + "tools": [{"name": "breakpilot-screening", "version": "1.0.0"}], + }, + "components": sbom_components, + } + + +# ============================================================================ +# OSV.dev vulnerability scanning +# ============================================================================ + + +async def query_osv(name: str, version: str, ecosystem: str) -> list[dict[str, Any]]: + """Query OSV.dev API for vulnerabilities of a single package.""" + try: + async with httpx.AsyncClient(timeout=10.0) as client: + response = await client.post( + OSV_API_URL, + json={ + "package": {"name": name, "ecosystem": ecosystem}, + "version": version, + }, + ) + if response.status_code == 200: + return response.json().get("vulns", []) + except Exception as exc: # noqa: BLE001 + logger.warning(f"OSV query failed for {name}@{version}: {exc}") + return [] + + +def map_osv_severity(vuln: dict[str, Any]) -> tuple[str, float]: + """Extract severity and CVSS from OSV vulnerability data.""" + severity = "MEDIUM" + db_specific = vuln.get("database_specific", {}) + if "severity" in db_specific: + sev_str = db_specific["severity"].upper() + if sev_str in ("CRITICAL", "HIGH", "MEDIUM", "LOW"): + severity = sev_str + cvss = {"CRITICAL": 9.5, "HIGH": 7.5, "MEDIUM": 5.0, "LOW": 2.5}.get(severity, 5.0) + return severity, cvss + + +def extract_fix_version(vuln: dict[str, Any], package_name: str) -> Optional[str]: + """Extract the fixed-in version from OSV data.""" + for affected in vuln.get("affected", []): + pkg = affected.get("package", {}) + if pkg.get("name", "").lower() == package_name.lower(): + for rng in affected.get("ranges", []): + for event in rng.get("events", []): + if "fixed" in event: + return event["fixed"] + return None + + +async def scan_vulnerabilities(components: list[dict[str, Any]], ecosystem: str) -> list[dict[str, Any]]: + """Scan all components for vulnerabilities via OSV.dev (max 50).""" + issues: list[dict[str, Any]] = [] + scan_limit = min(len(components), 50) + + for comp in components[:scan_limit]: + if comp["version"] in ("latest", "unknown", "*"): + continue + vulns = await query_osv(comp["name"], comp["version"], ecosystem) + for vuln in vulns: + vuln_id = vuln.get("id", f"OSV-{uuid.uuid4().hex[:8]}") + aliases = vuln.get("aliases", []) + cve = next((a for a in aliases if a.startswith("CVE-")), None) + severity, cvss = map_osv_severity(vuln) + fixed_in = extract_fix_version(vuln, comp["name"]) + issues.append({ + "id": str(uuid.uuid4()), + "severity": severity, + "title": vuln.get("summary", vuln_id), + "description": vuln.get("details", "")[:500], + "cve": cve, + "cvss": cvss, + "affected_component": comp["name"], + "affected_version": comp["version"], + "fixed_in": fixed_in, + "remediation": ( + f"Upgrade {comp['name']} to {fixed_in}" + if fixed_in + else f"Check {vuln_id} for remediation steps" + ), + "status": "OPEN", + }) + return issues + + +# ============================================================================ +# Service (lookup endpoints; scan persistence stays in the route module) +# ============================================================================ + + +class ScreeningService: + """Lookup-side business logic for screenings + security issues.""" + + def __init__(self, db: Session) -> None: + self.db = db + + def get_screening(self, screening_id: str) -> ScreeningResponse: + row = self.db.execute( + text( + "SELECT id, status, sbom_format, sbom_version, " + "total_components, total_issues, critical_issues, high_issues, " + "medium_issues, low_issues, sbom_data, started_at, completed_at " + "FROM compliance_screenings WHERE id = :id" + ), + {"id": screening_id}, + ).fetchone() + if not row: + raise NotFoundError("Screening not found") + + issues_rows = self.db.execute( + text( + "SELECT id, severity, title, description, cve, cvss, " + "affected_component, affected_version, fixed_in, remediation, status " + "FROM compliance_security_issues WHERE screening_id = :id" + ), + {"id": screening_id}, + ).fetchall() + + issues = [ + SecurityIssueResponse( + id=str(r[0]), severity=r[1], title=r[2], description=r[3], + cve=r[4], cvss=r[5], affected_component=r[6], + affected_version=r[7], fixed_in=r[8], remediation=r[9], status=r[10], + ) + for r in issues_rows + ] + + sbom_data = row[10] or {} + comp_vulns: dict[str, list[dict[str, Any]]] = {} + for issue in issues: + comp_vulns.setdefault(issue.affected_component, []).append({ + "id": issue.cve or issue.id, + "cve": issue.cve, + "severity": issue.severity, + "title": issue.title, + "cvss": issue.cvss, + "fixedIn": issue.fixed_in, + }) + + components = [ + SBOMComponentResponse( + name=sc["name"], + version=sc["version"], + type=sc.get("type", "library"), + purl=sc.get("purl", ""), + licenses=sc.get("licenses", []), + vulnerabilities=comp_vulns.get(sc["name"], []), + ) + for sc in sbom_data.get("components", []) + ] + + return ScreeningResponse( + id=str(row[0]), + status=row[1], + sbom_format=row[2] or "CycloneDX", + sbom_version=row[3] or "1.5", + total_components=row[4] or 0, + total_issues=row[5] or 0, + critical_issues=row[6] or 0, + high_issues=row[7] or 0, + medium_issues=row[8] or 0, + low_issues=row[9] or 0, + components=components, + issues=issues, + started_at=str(row[11]) if row[11] else None, + completed_at=str(row[12]) if row[12] else None, + ) + + def list_screenings(self, tenant_id: str) -> ScreeningListResponse: + rows = self.db.execute( + text( + "SELECT id, status, total_components, total_issues, " + "critical_issues, high_issues, medium_issues, low_issues, " + "started_at, completed_at, created_at " + "FROM compliance_screenings " + "WHERE tenant_id = :tenant_id " + "ORDER BY created_at DESC" + ), + {"tenant_id": tenant_id}, + ).fetchall() + screenings = [ + { + "id": str(r[0]), + "status": r[1], + "total_components": r[2], + "total_issues": r[3], + "critical_issues": r[4], + "high_issues": r[5], + "medium_issues": r[6], + "low_issues": r[7], + "started_at": str(r[8]) if r[8] else None, + "completed_at": str(r[9]) if r[9] else None, + "created_at": str(r[10]), + } + for r in rows + ] + return ScreeningListResponse(screenings=screenings, total=len(screenings)) diff --git a/backend-compliance/mypy.ini b/backend-compliance/mypy.ini index d7b0fd0..6cc1c78 100644 --- a/backend-compliance/mypy.ini +++ b/backend-compliance/mypy.ini @@ -85,5 +85,7 @@ ignore_errors = False ignore_errors = False [mypy-compliance.api.source_policy_router] ignore_errors = False +[mypy-compliance.api.screening_routes] +ignore_errors = False [mypy-compliance.api._http_errors] ignore_errors = False