""" FastAPI routes for CRA (Cyber Resilience Act) Compliance Projects. Endpoints: - GET /v1/cra/projects List all CRA projects for a tenant - POST /v1/cra/projects Create a new CRA project - GET /v1/cra/projects/{id} Get a single CRA project - PATCH /v1/cra/projects/{id} Update intake fields / status - DELETE /v1/cra/projects/{id} Archive project - POST /v1/cra/projects/{id}/scope-check Run deterministic Annex III/IV match - POST /v1/cra/projects/{id}/path-select Set conformity path (validated against classification) Tenant scoping via X-Tenant-ID header (see tenant_utils.get_tenant_id). """ import json import logging import socket import ssl from datetime import date from typing import Optional from uuid import UUID import httpx from fastapi import APIRouter, Depends, HTTPException, UploadFile, File from pydantic import BaseModel from sqlalchemy import text from database import SessionLocal from .cra_annex_i_data import ( ANNEX_I_REQUIREMENTS, MEASURES, DEADLINES, SEVERITY_WEIGHT, ) from .tenant_utils import get_tenant_id logger = logging.getLogger(__name__) router = APIRouter(prefix="/v1/cra/projects", tags=["cra"]) # ============================================================================= # CONSTANTS — Annex III/IV mapping (source: migration 059_wiki_cra_annex_i_detail.sql) # ============================================================================= ANNEX_IV_CRITICAL = [ "hardware-security-module", "hardware security module", "hsm", "smartcard-chip", "smartcard chip", "smart-meter-gateway", "smart meter gateway", "secure-element", "secure element", ] ANNEX_III_CLASS_II = [ "betriebssystem", "operating system", "hypervisor", "container-runtime", "container runtime", "public-key-infrastruktur", "public key infrastructure", "pki", "industrial-control", "industrial control", "ics", "scada", ] ANNEX_III_CLASS_I = [ "passwort-manager", "password manager", "passwort manager", "vpn-software", "vpn software", "firewall", "router", "smart-home", "smart home", "iot-sensor", "iot sensor", "iot-geraet", "siem", "siem-system", ] CLASSIFICATIONS = {"NOT_IN_SCOPE", "STANDARD", "IMPORTANT_I", "IMPORTANT_II", "CRITICAL"} CONFORMITY_PATHS = {"self_assessment", "harmonized_standard", "eucc", "notified_body"} # Allowed paths per classification (CRITICAL must use notified_body) ALLOWED_PATHS = { "NOT_IN_SCOPE": set(), "STANDARD": {"self_assessment", "harmonized_standard", "eucc", "notified_body"}, "IMPORTANT_I": {"self_assessment", "harmonized_standard", "eucc", "notified_body"}, "IMPORTANT_II": {"harmonized_standard", "eucc", "notified_body"}, "CRITICAL": {"notified_body"}, } STATUS_WHITELIST = { "draft", "scoped", "classified", "path_selected", "requirements_mapped", "evidence_pending", "gaps_open", "remediation", "ready_for_review", "declaration_ready", "post_market", "archived", } # ============================================================================= # REQUEST / RESPONSE MODELS # ============================================================================= class CreateProjectRequest(BaseModel): name: str description: str = "" gap_project_id: Optional[str] = None class UpdateProjectRequest(BaseModel): name: Optional[str] = None description: Optional[str] = None repo_url: Optional[str] = None primary_language: Optional[str] = None has_firmware: Optional[bool] = None connected_to_internet: Optional[bool] = None has_software_updates: Optional[bool] = None processes_personal_data: Optional[bool] = None is_critical_infra_supplier: Optional[bool] = None intended_use: Optional[str] = None status: Optional[str] = None class PathSelectRequest(BaseModel): conformity_path: str # ============================================================================= # HELPERS # ============================================================================= def _row_to_response(row) -> dict: """Convert a DB row to dict.""" return { "id": str(row.id), "tenant_id": row.tenant_id, "name": row.name, "description": row.description or "", "gap_project_id": str(row.gap_project_id) if row.gap_project_id else None, "repo_url": row.repo_url, "primary_language": row.primary_language, "has_firmware": bool(row.has_firmware), "connected_to_internet": bool(row.connected_to_internet), "has_software_updates": bool(row.has_software_updates), "processes_personal_data": bool(row.processes_personal_data), "is_critical_infra_supplier": bool(row.is_critical_infra_supplier), "intended_use": row.intended_use or "", "cra_classification": row.cra_classification, "classification_rationale": ( row.classification_rationale if isinstance(row.classification_rationale, list) else json.loads(row.classification_rationale or "[]") ), "conformity_path": row.conformity_path, "status": row.status, "created_at": row.created_at.isoformat() if row.created_at else "", "updated_at": row.updated_at.isoformat() if row.updated_at else "", } def _classify(intake: dict) -> tuple[str, list[str]]: """Deterministic Annex III/IV match. Returns (classification, rationale_list).""" haystack = " ".join( str(intake.get(f) or "").lower() for f in ("intended_use", "primary_language", "name", "description") ) def _match(buckets: list[str]) -> Optional[str]: for token in buckets: if token.lower() in haystack: return token return None rationale: list[str] = [] hit = _match(ANNEX_IV_CRITICAL) if hit: rationale.append(f"Annex IV (Critical): Treffer auf '{hit}'") return "CRITICAL", rationale hit = _match(ANNEX_III_CLASS_II) if hit: rationale.append(f"Annex III Klasse II (Important_II): Treffer auf '{hit}'") return "IMPORTANT_II", rationale hit = _match(ANNEX_III_CLASS_I) if hit: rationale.append(f"Annex III Klasse I (Important_I): Treffer auf '{hit}'") return "IMPORTANT_I", rationale flags = [ ("connected_to_internet", "Produkt ist mit dem Internet verbunden"), ("has_software_updates", "Produkt hat Software-Updates (digitales Element)"), ("processes_personal_data", "Produkt verarbeitet personenbezogene Daten"), ("is_critical_infra_supplier", "Produkt wird in kritischer Infrastruktur eingesetzt"), ] for field, msg in flags: if intake.get(field): rationale.append(msg) if rationale: return "STANDARD", rationale rationale.append("Kein digitales Element und keine Annex III/IV-Kategorie erkannt") return "NOT_IN_SCOPE", rationale # ============================================================================= # ENDPOINTS # ============================================================================= @router.get("") async def list_projects( tenant_id: str = Depends(get_tenant_id), include_archived: bool = False, ): """List all CRA projects for the tenant.""" db = SessionLocal() try: if include_archived: sql = """ SELECT * FROM compliance_cra_projects WHERE tenant_id = :tenant_id ORDER BY created_at DESC """ else: sql = """ SELECT * FROM compliance_cra_projects WHERE tenant_id = :tenant_id AND status != 'archived' ORDER BY created_at DESC """ rows = db.execute(text(sql), {"tenant_id": tenant_id}).fetchall() return { "projects": [_row_to_response(row) for row in rows], "total": len(rows), } finally: db.close() @router.post("", status_code=201) async def create_project( body: CreateProjectRequest, tenant_id: str = Depends(get_tenant_id), ): """Create a new CRA project. Optionally pre-populate intake from a gap_project.""" db = SessionLocal() try: gap_intake: dict = {} if body.gap_project_id: try: gap_uuid = UUID(body.gap_project_id) except ValueError: raise HTTPException(status_code=400, detail="Invalid gap_project_id (must be UUID)") gap_row = db.execute( text(""" SELECT connected_to_internet, has_software_updates, processes_personal_data, is_critical_infra_supplier, description FROM gap_projects WHERE id = :gid AND tenant_id = CAST(:tid AS uuid) """), {"gid": str(gap_uuid), "tid": tenant_id}, ).fetchone() if gap_row: gap_intake = { "connected_to_internet": bool(gap_row.connected_to_internet), "has_software_updates": bool(gap_row.has_software_updates), "processes_personal_data": bool(gap_row.processes_personal_data), "is_critical_infra_supplier": bool(gap_row.is_critical_infra_supplier), "intended_use": gap_row.description or "", } result = db.execute( text(""" INSERT INTO compliance_cra_projects (tenant_id, name, description, gap_project_id, connected_to_internet, has_software_updates, processes_personal_data, is_critical_infra_supplier, intended_use, status) VALUES (:tenant_id, :name, :description, :gap_project_id, :connected_to_internet, :has_software_updates, :processes_personal_data, :is_critical_infra_supplier, :intended_use, 'draft') RETURNING * """), { "tenant_id": tenant_id, "name": body.name, "description": body.description, "gap_project_id": body.gap_project_id, "connected_to_internet": gap_intake.get("connected_to_internet", False), "has_software_updates": gap_intake.get("has_software_updates", False), "processes_personal_data": gap_intake.get("processes_personal_data", False), "is_critical_infra_supplier": gap_intake.get("is_critical_infra_supplier", False), "intended_use": gap_intake.get("intended_use", ""), }, ) row = result.fetchone() db.commit() logger.info("Created CRA project %s for tenant %s", row.id, tenant_id) return _row_to_response(row) except HTTPException: raise except Exception: db.rollback() raise finally: db.close() @router.get("/{project_id}") async def get_project( project_id: str, tenant_id: str = Depends(get_tenant_id), ): """Get a single CRA project (tenant-scoped).""" db = SessionLocal() try: row = db.execute( text(""" SELECT * FROM compliance_cra_projects WHERE id = :pid AND tenant_id = :tid """), {"pid": project_id, "tid": tenant_id}, ).fetchone() if not row: raise HTTPException(status_code=404, detail="CRA project not found") return _row_to_response(row) finally: db.close() @router.patch("/{project_id}") async def update_project( project_id: str, body: UpdateProjectRequest, tenant_id: str = Depends(get_tenant_id), ): """Update intake / status fields. Status is whitelist-validated (no transitions).""" db = SessionLocal() try: updates: dict = {"pid": project_id, "tid": tenant_id} set_parts = ["updated_at = NOW()"] for field in ( "name", "description", "repo_url", "primary_language", "has_firmware", "connected_to_internet", "has_software_updates", "processes_personal_data", "is_critical_infra_supplier", "intended_use", "status", ): val = getattr(body, field) if val is None: continue if field == "status" and val not in STATUS_WHITELIST: raise HTTPException( status_code=400, detail=f"Invalid status '{val}'. Allowed: {sorted(STATUS_WHITELIST)}", ) set_parts.append(f"{field} = :{field}") updates[field] = val if len(set_parts) == 1: raise HTTPException(status_code=400, detail="No fields to update") result = db.execute( text(f""" UPDATE compliance_cra_projects SET {', '.join(set_parts)} WHERE id = :pid AND tenant_id = :tid RETURNING * """), updates, ) row = result.fetchone() if not row: raise HTTPException(status_code=404, detail="CRA project not found") db.commit() return _row_to_response(row) except HTTPException: raise except Exception: db.rollback() raise finally: db.close() @router.delete("/{project_id}") async def archive_project( project_id: str, tenant_id: str = Depends(get_tenant_id), ): """Soft-archive a CRA project.""" db = SessionLocal() try: result = db.execute( text(""" UPDATE compliance_cra_projects SET status = 'archived', updated_at = NOW() WHERE id = :pid AND tenant_id = :tid AND status != 'archived' RETURNING id """), {"pid": project_id, "tid": tenant_id}, ) row = result.fetchone() if not row: raise HTTPException(status_code=404, detail="Project not found or already archived") db.commit() return {"success": True, "id": str(row.id), "status": "archived"} except HTTPException: raise except Exception: db.rollback() raise finally: db.close() @router.post("/{project_id}/scope-check") async def scope_check( project_id: str, tenant_id: str = Depends(get_tenant_id), ): """Run deterministic Annex III/IV classification on the project's intake fields. Writes cra_classification + classification_rationale + status update. """ db = SessionLocal() try: row = db.execute( text(""" SELECT * FROM compliance_cra_projects WHERE id = :pid AND tenant_id = :tid """), {"pid": project_id, "tid": tenant_id}, ).fetchone() if not row: raise HTTPException(status_code=404, detail="CRA project not found") intake = { "name": row.name, "description": row.description, "intended_use": row.intended_use, "primary_language": row.primary_language, "connected_to_internet": row.connected_to_internet, "has_software_updates": row.has_software_updates, "processes_personal_data": row.processes_personal_data, "is_critical_infra_supplier": row.is_critical_infra_supplier, } classification, rationale = _classify(intake) new_status = "scoped" if classification == "NOT_IN_SCOPE" else "classified" updated = db.execute( text(""" UPDATE compliance_cra_projects SET cra_classification = :cls, classification_rationale = CAST(:rat AS jsonb), status = :status, updated_at = NOW() WHERE id = :pid AND tenant_id = :tid RETURNING * """), { "pid": project_id, "tid": tenant_id, "cls": classification, "rat": json.dumps(rationale, ensure_ascii=False), "status": new_status, }, ).fetchone() db.commit() logger.info( "Scope-check %s for tenant %s -> %s", project_id, tenant_id, classification, ) return _row_to_response(updated) except HTTPException: raise except Exception: db.rollback() raise finally: db.close() @router.get("/{project_id}/requirements") async def list_requirements( project_id: str, tenant_id: str = Depends(get_tenant_id), ): """Liste der 40 Annex-I-Requirements mit per-Projekt-Status. Phase-2-Logik: Status default 'unbewertet' (kein evidence_check linked). Phase-3 wird per-req Status aus compliance_evidence_check_results berechnen. """ db = SessionLocal() try: row = db.execute( text(""" SELECT id, cra_classification FROM compliance_cra_projects WHERE id = :pid AND tenant_id = :tid """), {"pid": project_id, "tid": tenant_id}, ).fetchone() if not row: raise HTTPException(status_code=404, detail="CRA project not found") items = [] for req in ANNEX_I_REQUIREMENTS: items.append({ **req, "status": "unbewertet", "mapped_measure_names": [ {"id": m, "name": MEASURES.get(m, m)} for m in req["mapped_measures"] ], }) return { "project_id": project_id, "classification": row.cra_classification, "total": len(items), "items": items, } finally: db.close() @router.get("/{project_id}/backlog") async def get_backlog( project_id: str, tenant_id: str = Depends(get_tenant_id), ): """Priorisierte TODO-Liste — gleiche Daten wie /requirements, aber sortiert. Sortierung: severity_weight (desc), effort_days (asc), n (asc). Deadline-Druck: Tage bis 11.12.2027 (CRA-CE-Pflicht). """ db = SessionLocal() try: row = db.execute( text(""" SELECT id, cra_classification FROM compliance_cra_projects WHERE id = :pid AND tenant_id = :tid """), {"pid": project_id, "tid": tenant_id}, ).fetchone() if not row: raise HTTPException(status_code=404, detail="CRA project not found") ce_deadline = date(2027, 12, 11) today = date.today() days_to_ce = max(0, (ce_deadline - today).days) max_days = 365 * 3 # 3 Jahre als Skalierungsbasis items = [] for req in ANNEX_I_REQUIREMENTS: severity_w = SEVERITY_WEIGHT.get(req["severity"], 0) deadline_pressure = 1.0 - min(days_to_ce / max_days, 1.0) # 0..1 effort_factor = 1.0 / max(req["effort_days"], 1) priority_score = ( severity_w * (1.0 + 0.5 * deadline_pressure) * (1.0 + 0.1 * effort_factor) ) items.append({ "req_id": req["req_id"], "title": req["title"], "category": req["category"], "severity": req["severity"], "annex_anchor": req["annex_anchor"], "description": req["description"], "effort_days": req["effort_days"], "mapped_measure_names": [ {"id": m, "name": MEASURES.get(m, m)} for m in req["mapped_measures"] ], "status": "unbewertet", "priority_score": round(priority_score, 1), }) items.sort(key=lambda x: (-x["priority_score"], x["effort_days"])) for idx, it in enumerate(items, 1): it["rank"] = idx return { "project_id": project_id, "classification": row.cra_classification, "days_to_ce_deadline": days_to_ce, "deadlines": DEADLINES, "total": len(items), "items": items, } finally: db.close() @router.post("/{project_id}/path-select") async def path_select( project_id: str, body: PathSelectRequest, tenant_id: str = Depends(get_tenant_id), ): """Set conformity_path. Validates path is allowed for the project's classification.""" if body.conformity_path not in CONFORMITY_PATHS: raise HTTPException( status_code=400, detail=f"Invalid conformity_path. Allowed: {sorted(CONFORMITY_PATHS)}", ) db = SessionLocal() try: row = db.execute( text(""" SELECT cra_classification FROM compliance_cra_projects WHERE id = :pid AND tenant_id = :tid """), {"pid": project_id, "tid": tenant_id}, ).fetchone() if not row: raise HTTPException(status_code=404, detail="CRA project not found") if not row.cra_classification: raise HTTPException( status_code=400, detail="Run scope-check before selecting a conformity path", ) allowed = ALLOWED_PATHS.get(row.cra_classification, set()) if body.conformity_path not in allowed: raise HTTPException( status_code=400, detail=( f"Path '{body.conformity_path}' not allowed for classification " f"'{row.cra_classification}'. Allowed: {sorted(allowed)}" ), ) updated = db.execute( text(""" UPDATE compliance_cra_projects SET conformity_path = :path, status = 'path_selected', updated_at = NOW() WHERE id = :pid AND tenant_id = :tid RETURNING * """), {"pid": project_id, "tid": tenant_id, "path": body.conformity_path}, ).fetchone() db.commit() return _row_to_response(updated) except HTTPException: raise except Exception: db.rollback() raise finally: db.close() # ============================================================================= # PHASE 3: SBOM Upload + Automated Checks # ============================================================================= DEFAULT_CHECKS = [ { "check_code": "cra_security_txt", "title": "security.txt (RFC 9116)", "description": "Pruefung ob /.well-known/security.txt existiert und Contact-Eintrag enthaelt", "check_type": "url_probe", "linked_req_ids": ["CRA-AI-35"], # CVD }, { "check_code": "cra_vuln_disclosure_url", "title": "Vulnerability-Disclosure-Policy-URL", "description": "Manuelle Pflege: URL zur CVD-Policy. Wird in DoC referenziert.", "check_type": "manual_review", "linked_req_ids": ["CRA-AI-35"], }, { "check_code": "cra_update_policy_url", "title": "Update-Policy-URL", "description": "URL zur dokumentierten Patch-/Update-Policy (Lifecycle, SLAs).", "check_type": "manual_review", "linked_req_ids": ["CRA-AI-28", "CRA-AI-31", "CRA-AI-34"], }, { "check_code": "cra_signed_updates", "title": "Signierte Updates", "description": "Bestaetigung dass Updates digital signiert ausgeliefert werden.", "check_type": "manual_review", "linked_req_ids": ["CRA-AI-29", "CRA-AI-30"], }, { "check_code": "cra_default_password_check", "title": "Keine Default-Passwoerter", "description": "Bestaetigung dass das Produkt keine Default-Passwoerter ausliefert.", "check_type": "manual_review", "linked_req_ids": ["CRA-AI-8"], }, { "check_code": "cra_tls_cert_check", "title": "TLS-Zertifikat des Produkt-Endpoints", "description": "Pruefung ob TLS 1.2+ und gueltiges Zertifikat.", "check_type": "tls_probe", "linked_req_ids": ["CRA-AI-15"], }, ] class CheckRunRequest(BaseModel): target_url: Optional[str] = None def _cra_project_exists(db, project_id: str, tenant_id: str) -> bool: row = db.execute( text(""" SELECT 1 FROM compliance_cra_projects WHERE id = :pid AND tenant_id = :tid """), {"pid": project_id, "tid": tenant_id}, ).fetchone() return bool(row) @router.post("/{project_id}/sbom/upload") async def upload_sbom( project_id: str, file: UploadFile = File(...), tenant_id: str = Depends(get_tenant_id), ): """Upload CycloneDX or SPDX SBOM (JSON). Persists in compliance_cra_sboms.""" db = SessionLocal() try: if not _cra_project_exists(db, project_id, tenant_id): raise HTTPException(status_code=404, detail="CRA project not found") raw = await file.read() try: data = json.loads(raw) except json.JSONDecodeError as e: raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}") # Format detection fmt = None spec_version = None component_count = 0 summary: dict = {} if isinstance(data, dict) and data.get("bomFormat") == "CycloneDX": fmt = "cyclonedx" spec_version = data.get("specVersion") or data.get("version") components = data.get("components") or [] component_count = len(components) summary = { "bomFormat": "CycloneDX", "specVersion": spec_version, "serialNumber": data.get("serialNumber"), "metadata_component": (data.get("metadata") or {}).get("component", {}), "sample_components": components[:5], } elif isinstance(data, dict) and data.get("spdxVersion"): fmt = "spdx" spec_version = data["spdxVersion"] packages = data.get("packages") or [] component_count = len(packages) summary = { "spdxVersion": spec_version, "name": data.get("name"), "sample_packages": [{"name": p.get("name"), "version": p.get("versionInfo")} for p in packages[:5]], } else: raise HTTPException( status_code=400, detail="Unrecognized SBOM format. Expected CycloneDX (bomFormat='CycloneDX') or SPDX (spdxVersion present).", ) result = db.execute( text(""" INSERT INTO compliance_cra_sboms (cra_project_id, tenant_id, filename, format, spec_version, component_count, raw_content, summary, scan_status) VALUES (:pid, :tid, :fn, :fmt, :ver, :cnt, CAST(:raw AS jsonb), CAST(:sum AS jsonb), 'pending') RETURNING id, uploaded_at """), { "pid": project_id, "tid": tenant_id, "fn": file.filename or "unknown.json", "fmt": fmt, "ver": spec_version, "cnt": component_count, "raw": json.dumps(data), "sum": json.dumps(summary), }, ).fetchone() db.commit() return { "id": str(result.id), "uploaded_at": result.uploaded_at.isoformat(), "filename": file.filename, "format": fmt, "spec_version": spec_version, "component_count": component_count, "summary": summary, "scan_status": "pending", "note": "osv.dev scan wird durch separates Tool im Team durchgefuehrt.", } except HTTPException: raise except Exception: db.rollback() raise finally: db.close() @router.get("/{project_id}/sboms") async def list_sboms( project_id: str, tenant_id: str = Depends(get_tenant_id), ): """List all SBOM uploads for this project, newest first.""" db = SessionLocal() try: if not _cra_project_exists(db, project_id, tenant_id): raise HTTPException(status_code=404, detail="CRA project not found") rows = db.execute( text(""" SELECT id, filename, format, spec_version, component_count, summary, scan_status, scan_summary, uploaded_at, scanned_at FROM compliance_cra_sboms WHERE cra_project_id = :pid AND tenant_id = :tid ORDER BY uploaded_at DESC """), {"pid": project_id, "tid": tenant_id}, ).fetchall() items = [] for r in rows: items.append({ "id": str(r.id), "filename": r.filename, "format": r.format, "spec_version": r.spec_version, "component_count": r.component_count, "summary": r.summary if isinstance(r.summary, dict) else json.loads(r.summary or "{}"), "scan_status": r.scan_status, "scan_summary": r.scan_summary if isinstance(r.scan_summary, dict) else json.loads(r.scan_summary or "{}"), "uploaded_at": r.uploaded_at.isoformat() if r.uploaded_at else None, "scanned_at": r.scanned_at.isoformat() if r.scanned_at else None, }) return {"project_id": project_id, "total": len(items), "items": items} finally: db.close() @router.get("/{project_id}/checks") async def list_checks( project_id: str, tenant_id: str = Depends(get_tenant_id), ): """List configured checks + their latest run result.""" db = SessionLocal() try: if not _cra_project_exists(db, project_id, tenant_id): raise HTTPException(status_code=404, detail="CRA project not found") rows = db.execute( text(""" SELECT id, check_code, title, description, check_type, target_url, linked_control_ids, last_run_at, is_active FROM compliance_evidence_checks WHERE tenant_id = CAST(:tid AS uuid) AND project_id = :pid_uuid AND check_code LIKE 'cra_%' ORDER BY check_code """), {"tid": tenant_id, "pid_uuid": project_id}, ).fetchall() # Latest result per check results: dict = {} if rows: ids = [str(r.id) for r in rows] for r_row in db.execute( text(""" SELECT check_id, status, message, ran_at FROM compliance_evidence_check_results WHERE check_id::text = ANY(:ids) ORDER BY ran_at DESC """), {"ids": ids}, ).fetchall(): cid = str(r_row.check_id) if cid not in results: results[cid] = { "status": r_row.status, "message": r_row.message, "ran_at": r_row.ran_at.isoformat() if r_row.ran_at else None, } items = [] for r in rows: linked = r.linked_control_ids if isinstance(linked, str): try: linked = json.loads(linked) except Exception: linked = [] items.append({ "id": str(r.id), "check_code": r.check_code, "title": r.title, "description": r.description, "check_type": r.check_type, "target_url": r.target_url, "linked_req_ids": linked or [], "last_run_at": r.last_run_at.isoformat() if r.last_run_at else None, "is_active": bool(r.is_active), "latest_result": results.get(str(r.id)), }) return {"project_id": project_id, "total": len(items), "items": items} finally: db.close() @router.post("/{project_id}/checks/init") async def init_checks( project_id: str, tenant_id: str = Depends(get_tenant_id), ): """Create the default set of CRA checks for this project (idempotent).""" db = SessionLocal() try: if not _cra_project_exists(db, project_id, tenant_id): raise HTTPException(status_code=404, detail="CRA project not found") existing = db.execute( text(""" SELECT check_code FROM compliance_evidence_checks WHERE tenant_id = CAST(:tid AS uuid) AND project_id = :pid AND check_code LIKE 'cra_%' """), {"tid": tenant_id, "pid": project_id}, ).fetchall() existing_codes = {r.check_code for r in existing} created = 0 for c in DEFAULT_CHECKS: if c["check_code"] in existing_codes: continue db.execute( text(""" INSERT INTO compliance_evidence_checks (tenant_id, project_id, check_code, title, description, check_type, linked_control_ids, frequency, is_active) VALUES (CAST(:tid AS uuid), :pid, :code, :title, :desc, :ctype, CAST(:linked AS jsonb), 'monthly', true) """), { "tid": tenant_id, "pid": project_id, "code": c["check_code"], "title": c["title"], "desc": c["description"], "ctype": c["check_type"], "linked": json.dumps(c["linked_req_ids"]), }, ) created += 1 db.commit() return {"created": created, "already_present": len(existing_codes)} except HTTPException: raise except Exception: db.rollback() raise finally: db.close() @router.post("/checks/{check_id}/run") async def run_check( check_id: str, body: CheckRunRequest, tenant_id: str = Depends(get_tenant_id), ): """Manually run a single check. Currently implements: - cra_security_txt: HTTP GET, looks for 'Contact:' field - cra_tls_cert_check: TLS handshake + cert info - others: returns 'manual_review_required' status """ db = SessionLocal() try: check = db.execute( text(""" SELECT id, check_code, target_url FROM compliance_evidence_checks WHERE id = CAST(:cid AS uuid) AND tenant_id = CAST(:tid AS uuid) """), {"cid": check_id, "tid": tenant_id}, ).fetchone() if not check: raise HTTPException(status_code=404, detail="Check not found") url = body.target_url or check.target_url if url and url != check.target_url: db.execute( text("UPDATE compliance_evidence_checks SET target_url = :u WHERE id = CAST(:cid AS uuid)"), {"u": url, "cid": check_id}, ) status = "manual_review_required" message = "Dieser Check-Typ erfordert manuelle Pruefung." if check.check_code == "cra_security_txt" and url: base = url.rstrip("/") probe_url = f"{base}/.well-known/security.txt" try: async with httpx.AsyncClient(timeout=10, follow_redirects=True) as client: resp = await client.get(probe_url) if resp.status_code != 200: status = "fail" message = f"GET {probe_url} -> HTTP {resp.status_code}" else: has_contact = any( line.strip().lower().startswith("contact:") for line in resp.text.splitlines() ) if has_contact: status = "pass" message = f"security.txt vorhanden mit Contact-Eintrag ({len(resp.text)} bytes)" else: status = "fail" message = "security.txt vorhanden, aber KEIN 'Contact:'-Eintrag" except Exception as e: status = "fail" message = f"Fetch error: {e}" elif check.check_code == "cra_tls_cert_check" and url: try: host = url.replace("https://", "").replace("http://", "").split("/")[0].split(":")[0] ctx = ssl.create_default_context() with socket.create_connection((host, 443), timeout=10) as sock: with ctx.wrap_socket(sock, server_hostname=host) as ssock: cert = ssock.getpeercert() proto = ssock.version() if proto and proto >= "TLSv1.2": status = "pass" message = f"TLS {proto} ok. Issuer: {dict(x[0] for x in cert.get('issuer', [])).get('organizationName', 'unknown')}" else: status = "fail" message = f"TLS-Version zu alt: {proto}" except Exception as e: status = "fail" message = f"TLS error: {e}" # Persist result db.execute( text(""" INSERT INTO compliance_evidence_check_results (check_id, tenant_id, project_id, status, message, ran_at) VALUES (CAST(:cid AS uuid), CAST(:tid AS uuid), (SELECT project_id FROM compliance_evidence_checks WHERE id = CAST(:cid AS uuid)), :status, :msg, NOW()) """), {"cid": check_id, "tid": tenant_id, "status": status, "msg": message}, ) db.execute( text(""" UPDATE compliance_evidence_checks SET last_run_at = NOW() WHERE id = CAST(:cid AS uuid) """), {"cid": check_id}, ) db.commit() return { "check_id": check_id, "check_code": check.check_code, "status": status, "message": message, "target_url": url, } except HTTPException: raise except Exception: db.rollback() raise finally: db.close()