Files
breakpilot-compliance/backend-compliance/compliance/api/vendor_assessment_routes.py
T
Benjamin Admin 0b9150f16f
Build + Deploy / build-admin-compliance (push) Successful in 2m16s
Build + Deploy / build-ai-sdk (push) Successful in 58s
Build + Deploy / build-developer-portal (push) Successful in 1m13s
Build + Deploy / build-tts (push) Successful in 1m43s
CI / loc-budget (push) Failing after 17s
CI / secret-scan (push) Has been skipped
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
Build + Deploy / build-backend-compliance (push) Successful in 3m27s
Build + Deploy / build-document-crawler (push) Successful in 45s
Build + Deploy / build-dsms-gateway (push) Successful in 30s
Build + Deploy / build-dsms-node (push) Successful in 19s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / nodejs-build (push) Successful in 2m35s
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / test-go (push) Successful in 43s
CI / test-python-backend (push) Successful in 37s
CI / test-python-document-crawler (push) Successful in 26s
CI / test-python-dsms-gateway (push) Successful in 21s
CI / validate-canonical-controls (push) Successful in 14s
Build + Deploy / trigger-orca (push) Successful in 3m33s
feat(vendor-assessment): Pruefprotokoll + Frontend + Sidebar
Phase 4-5: Professional Pruefprotokoll report builder with styled HTML
output (Kopfdaten, Kategorie-Scores, L1/L2 Check-Hierarchie, Findings,
Freigabe-Block). Frontend at /sdk/vendor-assessment with 3-step flow:
DocumentUploader → AssessmentProgress → PruefprotokollView.

Sidebar: "Use-Case Audits" → "Vertragspruefung" renamed.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-12 23:24:12 +02:00

419 lines
15 KiB
Python

"""
Vendor Contract Assessment Routes — Automated vendor document analysis.
Uploads vendor contracts (AVV, SCC, TOM annex, sub-processor list),
runs them through the Doc-Check L1/L2 engine + LLM verification,
and produces a professional Pruefprotokoll.
POST /vendor-compliance/assessments — Start assessment (async)
GET /vendor-compliance/assessments — List assessments
GET /vendor-compliance/assessments/{id} — Poll status / get result
POST /vendor-compliance/assessments/{id}/approve — DSB approval
"""
import asyncio
import logging
import uuid as _uuid
from datetime import datetime, timezone
from typing import Optional
from fastapi import APIRouter
from pydantic import BaseModel
from compliance.services.dsi_document_checker import (
check_document_completeness,
)
from compliance.services.vendor_assessment_cross_check import (
cross_check_documents,
)
from compliance.services.vendor_assessment_report import (
build_pruefprotokoll,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/vendor-compliance", tags=["vendor-assessment"])
# ── Request / Response Models ───────────────────────────────────────
class DocumentEntry(BaseModel):
doc_type: str = "auto" # avv, scc, tom_annex, sub_processor_list, agb, auto
label: str = ""
url: str
class AssessmentRequest(BaseModel):
vendor_name: str
documents: list[DocumentEntry]
recipient: str = ""
class AssessmentStartResponse(BaseModel):
assessment_id: str
status: str = "running"
class FindingItem(BaseModel):
id: str
category: str
severity: str
type: str # OK, GAP, RISK
title: str
description: str = ""
recommendation: str = ""
document_label: str = ""
document_type: str = ""
check_id: str = ""
citations: list[str] = []
class DocumentResult(BaseModel):
label: str
url: str
doc_type: str
word_count: int = 0
completeness_pct: int = 0
correctness_pct: int = 0
checks: list[dict] = []
findings_count: int = 0
error: str = ""
class AssessmentResult(BaseModel):
vendor_name: str
documents: list[DocumentResult]
findings: list[FindingItem]
overall_score: int = 0
category_scores: dict[str, int] = {}
cross_check_findings: list[dict] = []
report_html: str = ""
checked_at: str = ""
class AssessmentStatusResponse(BaseModel):
assessment_id: str
status: str
progress: str = ""
result: Optional[AssessmentResult] = None
error: str = ""
# ── In-memory job store ─────────────────────────────────────────────
_assessment_jobs: dict[str, dict] = {}
# ── Endpoints ───────────────────────────────────────────────────────
@router.post("/assessments", response_model=AssessmentStartResponse)
async def start_assessment(req: AssessmentRequest):
"""Start an async vendor contract assessment."""
assessment_id = str(_uuid.uuid4())
_assessment_jobs[assessment_id] = {
"status": "running",
"progress": "Initialisierung...",
"result": None,
"error": "",
}
asyncio.create_task(_run_assessment(assessment_id, req))
return AssessmentStartResponse(assessment_id=assessment_id)
@router.get("/assessments/{assessment_id}", response_model=AssessmentStatusResponse)
async def get_assessment_status(assessment_id: str):
"""Poll assessment status or retrieve completed result."""
job = _assessment_jobs.get(assessment_id)
if not job:
return AssessmentStatusResponse(
assessment_id=assessment_id, status="not_found",
error="Assessment nicht gefunden",
)
return AssessmentStatusResponse(
assessment_id=assessment_id,
status=job["status"],
progress=job.get("progress", ""),
result=job.get("result"),
error=job.get("error", ""),
)
@router.get("/assessments")
async def list_assessments():
"""List all assessments (from in-memory store)."""
items = []
for aid, job in _assessment_jobs.items():
r = job.get("result")
items.append({
"assessment_id": aid,
"status": job["status"],
"vendor_name": r.vendor_name if r else "",
"overall_score": r.overall_score if r else 0,
"document_count": len(r.documents) if r else 0,
"findings_count": len(r.findings) if r else 0,
})
return {"assessments": items}
@router.post("/assessments/{assessment_id}/approve")
async def approve_assessment(assessment_id: str):
"""Mark an assessment as approved by DSB."""
job = _assessment_jobs.get(assessment_id)
if not job or job["status"] != "completed":
return {"error": "Assessment nicht abgeschlossen"}
job["status"] = "approved"
return {"status": "approved", "assessment_id": assessment_id}
# ── Background Processing ──────────────────────────────────────────
CONSENT_TESTER_URL = "http://bp-compliance-consent-tester:8094"
# Doc-type auto-detection keywords
_DOC_TYPE_KEYWORDS = {
"avv": ["auftragsverarbeit", "auftrags-verarbeit", "data processing agreement",
"dpa ", "art. 28", "art.28", "artikel 28"],
"scc": ["standardvertragsklausel", "standard contractual clauses",
"2021/914", "klausel 14", "module 2", "modul 2"],
"tom_annex": ["technische und organisatorische", "tom-anlage",
"art. 32", "zutrittskontrolle", "zugangskontrolle",
"zugriffskontrolle", "verfuegbarkeitskontrolle"],
"sub_processor_list": ["unterauftragnehmer", "sub-processor",
"subprocessor", "unterauftragsverarbeiter"],
"agb": ["allgemeine geschaeftsbedingungen", "nutzungsbedingungen",
"terms of service", "terms and conditions"],
}
def _detect_doc_type(text: str, label: str) -> str:
"""Auto-detect document type from content and label."""
combined = (text[:3000] + " " + label).lower()
scores: dict[str, int] = {}
for dtype, keywords in _DOC_TYPE_KEYWORDS.items():
scores[dtype] = sum(1 for kw in keywords if kw in combined)
if not scores or max(scores.values()) == 0:
return "agb" # fallback
return max(scores, key=scores.get)
async def _extract_text(url: str) -> tuple[str, int]:
"""Extract text from a URL via consent-tester or direct fetch."""
import httpx
# Try consent-tester first (handles JS-rendered pages)
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/dsi-discovery",
json={"url": url, "max_documents": 1},
)
if resp.status_code == 200:
data = resp.json()
docs = data.get("documents", [])
if docs:
text = docs[0].get("full_text", "")
wc = docs[0].get("word_count", 0)
if len(text) > 50:
return text, wc
# Fallback to full page
fp = data.get("html_full_page", "")
if len(fp) > 50:
return fp, len(fp.split())
except Exception as e:
logger.warning("consent-tester failed for %s: %s", url, e)
# Direct fetch fallback
try:
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.get(url)
text = resp.text
return text, len(text.split())
except Exception as e:
logger.error("Direct fetch failed for %s: %s", url, e)
return "", 0
async def _run_assessment(assessment_id: str, req: AssessmentRequest):
"""Background task: analyze all documents and produce Pruefprotokoll."""
job = _assessment_jobs[assessment_id]
doc_results: list[DocumentResult] = []
all_findings: list[FindingItem] = []
doc_texts: dict[str, str] = {} # doc_type → text (for cross-check)
try:
total = len(req.documents)
for i, entry in enumerate(req.documents):
job["progress"] = f"Dokument {i+1}/{total}: {entry.label or entry.url[:40]}..."
# 1. Extract text
text, word_count = await _extract_text(entry.url)
if not text or len(text) < 50:
doc_results.append(DocumentResult(
label=entry.label or entry.url,
url=entry.url,
doc_type=entry.doc_type,
error="Text konnte nicht extrahiert werden",
))
continue
# 2. Detect doc_type if auto
doc_type = entry.doc_type
if doc_type == "auto":
doc_type = _detect_doc_type(text, entry.label)
logger.info("Auto-detected doc_type=%s for %s", doc_type, entry.label)
doc_texts[doc_type] = text
# 3. Run checklist
label = entry.label or f"{doc_type.upper()}: {entry.url[:50]}"
result = check_document_completeness(text, doc_type, label, entry.url)
checks = result.get("checks", [])
completeness = result.get("completeness_pct", 0)
correctness = result.get("correctness_pct", 0)
# 4. Extract findings from failed checks
failed_checks = [c for c in checks if not c.get("passed") and not c.get("skipped")]
for fc in failed_checks:
severity = fc.get("severity", "MEDIUM")
ftype = "GAP" if severity in ("CRITICAL", "HIGH") else "RISK"
all_findings.append(FindingItem(
id=f"{assessment_id[:8]}-{fc['id']}",
category=_check_to_category(fc["id"], doc_type),
severity=severity,
type=ftype,
title=fc.get("label", ""),
description=fc.get("hint", ""),
recommendation=fc.get("hint", ""),
document_label=label,
document_type=doc_type,
check_id=fc["id"],
citations=[fc.get("matched_text", "")] if fc.get("matched_text") else [],
))
doc_results.append(DocumentResult(
label=label,
url=entry.url,
doc_type=doc_type,
word_count=word_count,
completeness_pct=completeness,
correctness_pct=correctness,
checks=checks,
findings_count=len(failed_checks),
))
# 5. Cross-check between documents
job["progress"] = "Cross-Check zwischen Dokumenten..."
cross_findings = cross_check_documents(doc_texts, req.vendor_name)
# 6. Calculate scores
category_scores = _calculate_category_scores(doc_results)
overall = _calculate_overall_score(category_scores, all_findings, cross_findings)
# 7. Build result
result = AssessmentResult(
vendor_name=req.vendor_name,
documents=doc_results,
findings=all_findings,
overall_score=overall,
category_scores=category_scores,
cross_check_findings=cross_findings,
checked_at=datetime.now(timezone.utc).isoformat(),
)
# 8. Generate Pruefprotokoll HTML
try:
result.report_html = build_pruefprotokoll(result.model_dump())
except Exception as e:
logger.warning("Report generation failed: %s", e)
job["status"] = "completed"
job["progress"] = ""
job["result"] = result
logger.info("Assessment %s completed: %d docs, %d findings, score=%d%%",
assessment_id, len(doc_results), len(all_findings), overall)
except Exception as e:
logger.exception("Assessment %s failed", assessment_id)
job["status"] = "failed"
job["error"] = str(e)
# ── Helpers ─────────────────────────────────────────────────────────
def _check_to_category(check_id: str, doc_type: str) -> str:
"""Map a check ID to a finding category."""
prefix_map = {
"avv_instruction": "INSTRUCTION",
"avv_confidentiality": "CONFIDENTIALITY",
"avv_tom": "TOM",
"avv_subprocessor": "SUBPROCESSOR",
"avv_data_subject": "DATA_SUBJECT_RIGHTS",
"avv_dpia": "GENERAL",
"avv_deletion": "DELETION",
"avv_audit": "AUDIT_RIGHTS",
"avv_breach": "INCIDENT",
"avv_liability": "LIABILITY",
"avv_subject": "AVV_CONTENT",
"scc_": "TRANSFER",
"tom_": "TOM",
"sub_": "SUBPROCESSOR",
}
for prefix, cat in prefix_map.items():
if check_id.startswith(prefix):
return cat
return doc_type.upper()
def _calculate_category_scores(docs: list[DocumentResult]) -> dict[str, int]:
"""Calculate per-category compliance scores from document results."""
cat_totals: dict[str, int] = {}
cat_passed: dict[str, int] = {}
for doc in docs:
for check in doc.checks:
if check.get("skipped"):
continue
cat = _check_to_category(check.get("id", ""), doc.doc_type)
cat_totals[cat] = cat_totals.get(cat, 0) + 1
if check.get("passed"):
cat_passed[cat] = cat_passed.get(cat, 0) + 1
scores = {}
for cat, total in cat_totals.items():
passed = cat_passed.get(cat, 0)
scores[cat] = round(passed / total * 100) if total > 0 else 0
return scores
def _calculate_overall_score(
category_scores: dict[str, int],
findings: list[FindingItem],
cross_findings: list[dict],
) -> int:
"""Calculate overall compliance score."""
if not category_scores:
return 0
# Weighted average: CRITICAL categories count double
critical_cats = {"INSTRUCTION", "TOM", "SUBPROCESSOR", "DELETION", "INCIDENT", "TRANSFER"}
total_weight = 0
weighted_sum = 0
for cat, score in category_scores.items():
weight = 2 if cat in critical_cats else 1
weighted_sum += score * weight
total_weight += weight
base = round(weighted_sum / total_weight) if total_weight > 0 else 0
# Penalty for critical findings
critical_count = sum(1 for f in findings if f.severity == "CRITICAL")
cross_critical = sum(1 for f in cross_findings if f.get("severity") == "CRITICAL")
penalty = (critical_count + cross_critical) * 5
return max(0, min(100, base - penalty))