4f29e5ff3c
Build + Deploy / build-admin-compliance (push) Successful in 1m49s
Build + Deploy / build-backend-compliance (push) Successful in 9s
Build + Deploy / build-ai-sdk (push) Successful in 8s
Build + Deploy / build-developer-portal (push) Successful in 8s
Build + Deploy / build-tts (push) Successful in 9s
Build + Deploy / build-document-crawler (push) Successful in 8s
Build + Deploy / build-dsms-gateway (push) Successful in 7s
Build + Deploy / build-dsms-node (push) Successful in 8s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / loc-budget (push) Failing after 15s
CI / secret-scan (push) Has been skipped
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 2m55s
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / test-go (push) Failing after 45s
CI / test-python-backend (push) Successful in 42s
CI / test-python-document-crawler (push) Successful in 27s
CI / test-python-dsms-gateway (push) Successful in 26s
CI / validate-canonical-controls (push) Successful in 15s
Build + Deploy / trigger-orca (push) Successful in 2m13s
Path to 100% correctness: Regex finds 80%, LLM catches the rest.
1. LLM verification (llm_verify.py):
- Every regex FAIL is re-checked by Qwen (qwen3:32b)
- Binary YES/NO question with evidence extraction
- Overturned checks marked with [LLM] prefix in matched_text
- Graceful fallback if LLM unavailable
2. Section splitter hardening:
- Short lines (<16 chars) only treated as headings if preceded
by blank line — prevents table column headers ("Funktion",
"Speicherdauer") from splitting cookie sections
- Fixes IHK cookie section: 288 words → full section
3. DSFA documentation patterns expanded:
- Recognizes "4.) Ergebnis:" numbered result sections
- Matches risk assessment conclusions
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
417 lines
15 KiB
Python
417 lines
15 KiB
Python
"""
|
|
Agent Document Check Routes — Multi-URL document verification.
|
|
|
|
The user provides explicit URLs + document types. No crawling needed.
|
|
Each document is loaded, expanded (accordions/tabs), text extracted,
|
|
and checked against its type-specific legal checklist.
|
|
|
|
POST /api/compliance/agent/doc-check
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import uuid as _uuid
|
|
from datetime import datetime, timezone
|
|
|
|
import httpx
|
|
from fastapi import APIRouter
|
|
from pydantic import BaseModel
|
|
|
|
from compliance.services.dsi_document_checker import (
|
|
check_document_completeness, classify_document_type,
|
|
)
|
|
from compliance.services.smtp_sender import send_email
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/compliance/agent", tags=["agent"])
|
|
|
|
CONSENT_TESTER_URL = "http://bp-compliance-consent-tester:8094"
|
|
|
|
|
|
class DocCheckEntry(BaseModel):
|
|
doc_type: str # dse, agb, impressum, cookie, widerruf, other
|
|
label: str
|
|
url: str
|
|
|
|
|
|
class DocCheckRequest(BaseModel):
|
|
entries: list[DocCheckEntry]
|
|
recipient: str = "dsb@breakpilot.local"
|
|
check_cookie_banner: bool = False
|
|
|
|
|
|
class CheckItem(BaseModel):
|
|
id: str
|
|
label: str
|
|
passed: bool
|
|
severity: str
|
|
matched_text: str = ""
|
|
level: int = 1
|
|
parent: str | None = None
|
|
skipped: bool = False
|
|
hint: str = ""
|
|
|
|
|
|
class DocCheckResult(BaseModel):
|
|
label: str
|
|
url: str
|
|
doc_type: str
|
|
word_count: int = 0
|
|
completeness_pct: int = 0
|
|
correctness_pct: int = 0
|
|
checks: list[CheckItem] = []
|
|
findings_count: int = 0
|
|
error: str = ""
|
|
|
|
|
|
class DocCheckResponse(BaseModel):
|
|
results: list[DocCheckResult]
|
|
cookie_banner_result: dict | None = None
|
|
total_documents: int
|
|
total_findings: int
|
|
email_status: str = ""
|
|
checked_at: str
|
|
|
|
|
|
# In-memory job store for async processing
|
|
_doc_check_jobs: dict[str, dict] = {}
|
|
|
|
|
|
class DocCheckStartResponse(BaseModel):
|
|
check_id: str
|
|
status: str = "running"
|
|
|
|
|
|
class DocCheckStatusResponse(BaseModel):
|
|
check_id: str
|
|
status: str
|
|
progress: str = ""
|
|
result: DocCheckResponse | None = None
|
|
error: str = ""
|
|
|
|
|
|
@router.post("/doc-check")
|
|
async def start_doc_check(req: DocCheckRequest):
|
|
"""Start async multi-URL document check."""
|
|
check_id = str(_uuid.uuid4())[:8]
|
|
_doc_check_jobs[check_id] = {"status": "running", "progress": "Pruefung gestartet...", "result": None, "error": ""}
|
|
asyncio.create_task(_run_doc_check(check_id, req))
|
|
return DocCheckStartResponse(check_id=check_id, status="running")
|
|
|
|
|
|
@router.get("/doc-check/{check_id}")
|
|
async def get_doc_check_status(check_id: str):
|
|
"""Poll document check status."""
|
|
job = _doc_check_jobs.get(check_id)
|
|
if not job:
|
|
return {"check_id": check_id, "status": "not_found"}
|
|
return DocCheckStatusResponse(
|
|
check_id=check_id, status=job["status"],
|
|
progress=job.get("progress", ""), result=job.get("result"),
|
|
error=job.get("error", ""),
|
|
)
|
|
|
|
|
|
async def _run_doc_check(check_id: str, req: DocCheckRequest):
|
|
"""Background task: check each document."""
|
|
try:
|
|
results: list[DocCheckResult] = []
|
|
total_findings = 0
|
|
|
|
for i, entry in enumerate(req.entries):
|
|
_doc_check_jobs[check_id]["progress"] = (
|
|
f"Dokument {i+1}/{len(req.entries)}: {entry.label}..."
|
|
)
|
|
|
|
doc_results = await _check_single_document(entry)
|
|
results.extend(doc_results)
|
|
total_findings += sum(r.findings_count for r in doc_results)
|
|
|
|
# Optional: Cookie banner check on first URL
|
|
cookie_result = None
|
|
if req.check_cookie_banner and req.entries:
|
|
_doc_check_jobs[check_id]["progress"] = "Cookie-Banner wird geprueft..."
|
|
cookie_result = await _check_cookie_banner(req.entries[0].url)
|
|
|
|
# Build email report
|
|
_doc_check_jobs[check_id]["progress"] = "Report wird erstellt..."
|
|
summary = _build_report(results, cookie_result)
|
|
email_result = send_email(
|
|
recipient=req.recipient,
|
|
subject=f"[DOKUMENTEN-PRUEFUNG] {len(results)} Dokumente geprueft",
|
|
body_html=summary,
|
|
)
|
|
|
|
response = DocCheckResponse(
|
|
results=results,
|
|
cookie_banner_result=cookie_result,
|
|
total_documents=len(results),
|
|
total_findings=total_findings,
|
|
email_status=email_result.get("status", "failed"),
|
|
checked_at=datetime.now(timezone.utc).isoformat(),
|
|
)
|
|
|
|
_doc_check_jobs[check_id]["status"] = "completed"
|
|
_doc_check_jobs[check_id]["result"] = response
|
|
_doc_check_jobs[check_id]["progress"] = "Fertig"
|
|
|
|
except Exception as e:
|
|
logger.error("Doc check %s failed: %s", check_id, e)
|
|
_doc_check_jobs[check_id]["status"] = "failed"
|
|
_doc_check_jobs[check_id]["error"] = str(e)[:500]
|
|
|
|
|
|
async def _check_single_document(entry: DocCheckEntry) -> list[DocCheckResult]:
|
|
"""Load a single URL, expand content, extract text, split into sections,
|
|
and check each section against its type-specific checklist.
|
|
|
|
Returns multiple results if the page contains sub-documents
|
|
(e.g. Cookies section, Social Media section on a DSI page).
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=90.0) as client:
|
|
resp = await client.post(
|
|
f"{CONSENT_TESTER_URL}/dsi-discovery",
|
|
json={"url": entry.url, "max_documents": 1},
|
|
)
|
|
if resp.status_code != 200:
|
|
return [DocCheckResult(
|
|
label=entry.label, url=entry.url, doc_type=entry.doc_type,
|
|
error=f"Seite nicht erreichbar (HTTP {resp.status_code})",
|
|
)]
|
|
|
|
data = resp.json()
|
|
docs = data.get("documents", [])
|
|
|
|
doc_text = ""
|
|
word_count = 0
|
|
if docs:
|
|
doc_text = docs[0].get("full_text", "") or docs[0].get("text_preview", "")
|
|
word_count = docs[0].get("word_count", 0)
|
|
|
|
if not doc_text or len(doc_text) < 50:
|
|
return [DocCheckResult(
|
|
label=entry.label, url=entry.url, doc_type=entry.doc_type,
|
|
error="Kein Text extrahierbar",
|
|
)]
|
|
|
|
# Split text into sections and check each
|
|
sections = _split_into_sections(doc_text, entry.label, entry.url)
|
|
all_results: list[DocCheckResult] = []
|
|
|
|
# Main document check (full text against primary type)
|
|
main_result = await _run_checklist(doc_text, entry.doc_type, entry.label, entry.url, word_count)
|
|
|
|
# Control Library deep check — DISABLED until doc-check-specific
|
|
# Master Controls with binary pass/fail criteria are available.
|
|
# See: zeroclaw/INSTRUCTION-master-controls-for-doc-check.md
|
|
# Code: compliance/services/rag_document_checker.py (ready to re-enable)
|
|
|
|
all_results.append(main_result)
|
|
|
|
# Sub-section checks (auto-detected from headings)
|
|
for section in sections:
|
|
if section["word_count"] < 100:
|
|
continue
|
|
sub_result = await _run_checklist(
|
|
section["text"], section["doc_type"],
|
|
section["title"], entry.url,
|
|
section["word_count"],
|
|
)
|
|
all_results.append(sub_result)
|
|
|
|
return all_results
|
|
|
|
except Exception as e:
|
|
logger.warning("Doc check failed for %s: %s", entry.url, e)
|
|
return [DocCheckResult(
|
|
label=entry.label, url=entry.url, doc_type=entry.doc_type,
|
|
error=str(e)[:200],
|
|
)]
|
|
|
|
|
|
async def _run_checklist(text: str, doc_type: str, label: str, url: str, word_count: int = 0) -> DocCheckResult:
|
|
"""Run checklist against text, then LLM-verify failed checks."""
|
|
findings = check_document_completeness(text, doc_type, label, url)
|
|
|
|
all_checks: list[CheckItem] = []
|
|
completeness = 0
|
|
correctness = 0
|
|
for f in findings:
|
|
if "SCORE" in f.get("code", ""):
|
|
for c in f.get("all_checks", []):
|
|
all_checks.append(CheckItem(
|
|
id=c["id"], label=c["label"], passed=c["passed"],
|
|
severity=c["severity"], matched_text=c.get("matched_text", ""),
|
|
level=c.get("level", 1),
|
|
parent=c.get("parent"),
|
|
skipped=c.get("skipped", False),
|
|
hint=c.get("hint", ""),
|
|
))
|
|
completeness = f.get("completeness_pct", 0)
|
|
correctness = f.get("correctness_pct", 0)
|
|
|
|
# LLM verification: re-check regex FAILs to eliminate false positives
|
|
failed = [c for c in all_checks if not c.passed and not c.skipped and c.hint]
|
|
if failed:
|
|
try:
|
|
from compliance.services.doc_checks.llm_verify import verify_failed_checks
|
|
overturns = await verify_failed_checks(
|
|
text,
|
|
[{"id": c.id, "label": c.label, "hint": c.hint} for c in failed],
|
|
label,
|
|
)
|
|
for c in all_checks:
|
|
if c.id in overturns and overturns[c.id]["overturned"]:
|
|
c.passed = True
|
|
c.matched_text = f"[LLM] {overturns[c.id]['evidence']}"
|
|
logger.info("LLM overturned: %s in %s", c.label, label)
|
|
# Recompute correctness after overturns
|
|
l2_active = [c for c in all_checks if c.level == 2 and not c.skipped]
|
|
l2_passed = sum(1 for c in l2_active if c.passed)
|
|
if l2_active:
|
|
correctness = round(l2_passed / len(l2_active) * 100)
|
|
except Exception as e:
|
|
logger.warning("LLM verification skipped: %s", e)
|
|
|
|
non_score = [f for f in findings if "SCORE" not in f.get("code", "")]
|
|
return DocCheckResult(
|
|
label=label, url=url, doc_type=doc_type,
|
|
word_count=word_count or len(text.split()),
|
|
completeness_pct=completeness,
|
|
correctness_pct=correctness,
|
|
checks=all_checks, findings_count=len(non_score),
|
|
)
|
|
|
|
|
|
# Section heading patterns → document type mapping
|
|
# ONLY sections that are genuinely separate document types with their own checklists.
|
|
# Everything else (Social Media, Betroffenenrechte, Dienste von Drittanbietern)
|
|
# is part of the parent DSI and inherits its checks.
|
|
SECTION_TYPE_MAP = [
|
|
(r"^cookie", "cookie"),
|
|
(r"widerrufsrecht|widerrufsbelehrung", "widerruf"),
|
|
(r"^impressum$", "impressum"),
|
|
(r"^(?:agb|allgemeine geschäftsbedingungen|nutzungsbedingungen)$", "agb"),
|
|
# DSFA MUST be checked BEFORE social_media (both can contain "Social Media")
|
|
(r"datenschutzfolge|dsfa|risikoanalyse", "dsfa"),
|
|
(r"^social\s*media$", "social_media"), # Standalone heading "Social Media" = DSE
|
|
(r"datenschutzerkl(?:ae|ä)rung.*social|datenschutz\s+f(?:ue|ü)r\s+social", "social_media"),
|
|
]
|
|
|
|
|
|
def _split_into_sections(text: str, parent_label: str, url: str) -> list[dict]:
|
|
"""Split document text at major headings into sub-sections.
|
|
|
|
Detects sections like 'Cookies', 'Social Media', 'Dienste von Drittanbietern'
|
|
and classifies each by document type for separate checking.
|
|
Deduplicates: if the same doc_type appears twice, texts are merged.
|
|
"""
|
|
import re as _re
|
|
sections: list[dict] = []
|
|
seen_types: dict[str, int] = {} # doc_type -> index in sections
|
|
|
|
lines = text.split("\n")
|
|
current_heading = ""
|
|
current_text: list[str] = []
|
|
|
|
def _save_section(heading: str, text_lines: list[str]) -> None:
|
|
sec_text = "\n".join(text_lines)
|
|
if len(sec_text.split()) < 100:
|
|
return
|
|
sec_type = _classify_section(heading)
|
|
if not sec_type:
|
|
return
|
|
# Merge duplicate doc_types (e.g. two "Social Media" headings)
|
|
if sec_type in seen_types:
|
|
idx = seen_types[sec_type]
|
|
sections[idx]["text"] += "\n\n" + sec_text
|
|
sections[idx]["word_count"] = len(sections[idx]["text"].split())
|
|
else:
|
|
seen_types[sec_type] = len(sections)
|
|
sections.append({
|
|
"title": f"{parent_label} > {heading}",
|
|
"text": sec_text,
|
|
"doc_type": sec_type,
|
|
"word_count": len(sec_text.split()),
|
|
})
|
|
|
|
prev_blank = False
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
is_heading = (
|
|
5 < len(stripped) < 80
|
|
and not stripped.endswith(".")
|
|
and not stripped.endswith(",")
|
|
and stripped[0].isupper()
|
|
# Require preceding blank line OR line > 15 chars to avoid
|
|
# table column headers ("Funktion", "Speicherdauer") being
|
|
# treated as section headings
|
|
and (prev_blank or len(stripped) > 15)
|
|
)
|
|
is_skip = is_heading and stripped.lower().strip() in SKIP_HEADINGS
|
|
|
|
if is_heading and not is_skip and current_heading:
|
|
_save_section(current_heading, current_text)
|
|
|
|
if is_heading and not is_skip:
|
|
current_heading = stripped
|
|
current_text = []
|
|
else:
|
|
current_text.append(line)
|
|
|
|
prev_blank = len(stripped) == 0
|
|
|
|
# Last section
|
|
if current_heading:
|
|
_save_section(current_heading, current_text)
|
|
|
|
return sections
|
|
|
|
|
|
# Headings to skip — sub-sections of other documents, not standalone
|
|
SKIP_HEADINGS = {
|
|
"nutzungskonzept social media", # Internal concept, no legal checklist
|
|
"risikoabwägung und datenschutzfolgenabschätzung", # Sub-section of DSFA
|
|
"risikoabwaegung und datenschutzfolgenabschaetzung",
|
|
}
|
|
|
|
# Track already-seen section types to avoid duplicate sub-documents
|
|
# (e.g. two "Social Media" headings on the same page)
|
|
_DEDUP_TYPES = {"social_media", "cookie", "dsfa", "widerruf", "impressum"}
|
|
|
|
|
|
def _classify_section(heading: str) -> str | None:
|
|
"""Classify a section heading into a document type."""
|
|
import re as _re
|
|
heading_lower = heading.lower().strip()
|
|
# Skip known sub-sections
|
|
if heading_lower in SKIP_HEADINGS:
|
|
return None
|
|
for pattern, doc_type in SECTION_TYPE_MAP:
|
|
if _re.search(pattern, heading_lower):
|
|
return doc_type
|
|
return None
|
|
|
|
|
|
async def _check_cookie_banner(url: str) -> dict | None:
|
|
"""Run cookie banner consent test on a URL."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=120.0) as client:
|
|
resp = await client.post(
|
|
f"{CONSENT_TESTER_URL}/scan",
|
|
json={"url": url, "timeout_per_phase": 8},
|
|
)
|
|
if resp.status_code == 200:
|
|
return resp.json()
|
|
except Exception as e:
|
|
logger.warning("Cookie banner check failed: %s", e)
|
|
return None
|
|
|
|
|
|
def _build_report(results: list[DocCheckResult], cookie_result: dict | None) -> str:
|
|
from .agent_doc_check_report import build_html_report
|
|
return build_html_report(results, cookie_result)
|