Files
breakpilot-compliance/backend-compliance/compliance/services/doc_checks/runner.py
T
Benjamin Admin 3efc491ec5
Build + Deploy / build-tts (push) Successful in 1m38s
Build + Deploy / build-document-crawler (push) Successful in 41s
Build + Deploy / build-dsms-gateway (push) Successful in 26s
Build + Deploy / build-dsms-node (push) Successful in 12s
Build + Deploy / build-admin-compliance (push) Successful in 2m22s
Build + Deploy / build-backend-compliance (push) Successful in 3m21s
Build + Deploy / build-ai-sdk (push) Successful in 53s
Build + Deploy / build-developer-portal (push) Successful in 1m16s
CI / guardrail-integrity (push) Has been skipped
CI / loc-budget (push) Failing after 20s
CI / secret-scan (push) Has been skipped
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / branch-name (push) Has been skipped
CI / nodejs-build (push) Successful in 3m18s
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / test-go (push) Failing after 59s
CI / test-python-backend (push) Successful in 47s
CI / test-python-document-crawler (push) Successful in 32s
CI / test-python-dsms-gateway (push) Successful in 27s
CI / validate-canonical-controls (push) Successful in 16s
Build + Deploy / trigger-orca (push) Successful in 3m23s
fix: 5 false positives from etogruppe.com ground truth
1. Soft hyphens (­/\xad) stripped before regex matching —
   fixes "Daten­übertrag­barkeit" not matching
2. Art. 15/17/20: allow adjectives between "Recht auf" and keyword
   ("Recht auf unentgeltliche Auskunft" now matches)
3. DSB contact: regex spans up to 300 chars across newlines
   (DSB section with company address between heading and email)
4. Löschkonzept: added "Fortfall", "Entfall", "Beendigung" as
   deletion trigger words alongside "Ablauf"/"Wegfall"

Reduces etogruppe FPs from 5 to ~1.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-07 23:51:04 +02:00

233 lines
8.9 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Document check runner — two-pass L1/L2 logic.
Pass 1: Run all L1 checks ("Is it mentioned?")
Pass 2: Run L2 checks only where their L1 parent passed ("Is it correct?")
"""
import logging
import re
from .dse_checks import ART13_CHECKLIST
from .widerruf_checks import WIDERRUF_CHECKLIST
from .agb_checks import AGB_CHECKLIST
from .impressum_checks import IMPRESSUM_CHECKLIST
from .cookie_checks import COOKIE_CHECKLIST
from .social_media_checks import JOINT_CONTROLLER_CHECKLIST
from .dsfa_checks import DSFA_CHECKLIST
logger = logging.getLogger(__name__)
# Map doc_type strings to (checklist, label)
_CHECKLIST_MAP = {
"dse": (ART13_CHECKLIST, "Art. 13 DSGVO"),
"datenschutz": (ART13_CHECKLIST, "Art. 13 DSGVO"),
"privacy": (ART13_CHECKLIST, "Art. 13 DSGVO"),
"widerruf": (WIDERRUF_CHECKLIST, "§355 BGB"),
"withdrawal": (WIDERRUF_CHECKLIST, "§355 BGB"),
"cancellation": (WIDERRUF_CHECKLIST, "§355 BGB"),
"agb": (AGB_CHECKLIST, "§305ff BGB"),
"terms": (AGB_CHECKLIST, "§305ff BGB"),
"nutzungsbedingungen": (AGB_CHECKLIST, "§305ff BGB"),
"impressum": (IMPRESSUM_CHECKLIST, "§5 TMG / §18 MStV"),
"imprint": (IMPRESSUM_CHECKLIST, "§5 TMG / §18 MStV"),
"cookie": (COOKIE_CHECKLIST, "§25 TDDDG"),
"social_media": (JOINT_CONTROLLER_CHECKLIST, "Art. 26 DSGVO"),
"joint_controller": (JOINT_CONTROLLER_CHECKLIST, "Art. 26 DSGVO"),
"dsfa": (DSFA_CHECKLIST, "Art. 35 DSGVO"),
}
def _match_patterns(patterns: list[str], text_lower: str):
"""Try each regex pattern against text, return first Match or None."""
for p in patterns:
m = re.search(p, text_lower)
if m:
return m
return None
def _extract_context(text_lower: str, match) -> str:
"""Extract ~30 chars around a match for evidence display."""
if not match:
return ""
start = max(0, match.start() - 30)
end = min(len(text_lower), match.end() + 30)
return text_lower[start:end].strip()
def check_document_completeness(
text: str,
doc_type: str,
doc_title: str,
doc_url: str,
) -> list[dict]:
"""Check a legal document against its type-specific requirements.
Two-pass approach:
L1 — Is the mandatory field mentioned at all?
L2 — Is it correct/complete? (only checked if L1 parent passed)
Returns a list of findings (summary + missing items).
"""
findings = []
# Strip soft hyphens (­ / \xad) that CMS tools insert for word-breaking
# — they break regex matches on compound words like "Datenübertragbarkeit"
text_clean = text.replace("\xad", "").replace("&shy;", "")
text_lower = text_clean.lower()
if not text or len(text) < 50:
findings.append({
"code": f"DSI-EMPTY-{doc_type.upper()}",
"severity": "HIGH",
"text": f"Dokument '{doc_title}' ist leer oder zu kurz fuer eine Pruefung.",
"doc_title": doc_title, "doc_url": doc_url, "doc_type": doc_type,
})
return findings
word_count = len(text.split())
if word_count < 200 and doc_type == "dse":
findings.append({
"code": f"DSI-SCORE-{doc_type.upper()}",
"severity": "LOW",
"text": (
f"'{doc_title}': Kurzhinweis ({word_count} Woerter) — zu kurz fuer "
f"eine vollstaendige Art. 13 DSGVO Pruefung. Kein eigenstaendiges DSI-Dokument."
),
"doc_title": doc_title, "doc_url": doc_url, "doc_type": doc_type,
"all_checks": [],
})
return findings
entry = _CHECKLIST_MAP.get(doc_type, (ART13_CHECKLIST, "Art. 13 DSGVO"))
checklist, label = entry
l1_checks = [c for c in checklist if c.get("level", 1) == 1]
l2_checks = [c for c in checklist if c.get("level", 1) == 2]
# ── Pass 1: L1 checks ────────────────────────────────────────────
passed_l1_ids: set[str] = set()
all_checks: list[dict] = []
l1_present = 0
for check in l1_checks:
match = _match_patterns(check["patterns"], text_lower)
passed = match is not None
if passed:
passed_l1_ids.add(check["id"])
l1_present += 1
else:
findings.append({
"code": f"DSI-MISSING-{check['id'].upper()}",
"severity": check.get("severity", "MEDIUM"),
"text": (
f"'{doc_title}': Pflichtangabe '{check['label']}' nicht gefunden. "
f"Erforderlich nach {label}."
),
"doc_title": doc_title, "doc_url": doc_url,
"doc_type": doc_type, "check_id": check["id"],
})
all_checks.append({
"id": check["id"], "label": check["label"],
"passed": passed, "severity": check.get("severity", "MEDIUM"),
"matched_text": _extract_context(text_lower, match),
"level": 1, "parent": None, "skipped": False,
"hint": check.get("hint", ""),
})
# ── Pass 2: L2 checks (only if parent L1 passed) ─────────────────
l2_total = 0
l2_passed = 0
for check in l2_checks:
parent = check.get("parent")
skipped = parent not in passed_l1_ids
passed = False
matched_text = ""
if not skipped:
l2_total += 1
match = _match_patterns(check["patterns"], text_lower)
passed = match is not None
if passed:
l2_passed += 1
matched_text = _extract_context(text_lower, match)
else:
findings.append({
"code": f"DSI-DETAIL-{check['id'].upper()}",
"severity": check.get("severity", "MEDIUM"),
"text": (
f"'{doc_title}': Detailpruefung '{check['label']}' "
f"nicht bestanden. Empfohlen nach {label}."
),
"doc_title": doc_title, "doc_url": doc_url,
"doc_type": doc_type, "check_id": check["id"],
})
all_checks.append({
"id": check["id"], "label": check["label"],
"passed": passed, "severity": check.get("severity", "MEDIUM"),
"matched_text": matched_text,
"level": 2, "parent": parent, "skipped": skipped,
"hint": check.get("hint", ""),
})
# ── Summary ───────────────────────────────────────────────────────
l1_total = len(l1_checks)
completeness_pct = round(l1_present / l1_total * 100) if l1_total else 0
correctness_pct = round(l2_passed / l2_total * 100) if l2_total else 0
severity = (
"OK" if completeness_pct == 100
else "LOW" if completeness_pct >= 80
else "MEDIUM" if completeness_pct >= 50
else "HIGH"
)
summary_text = (
f"'{doc_title}': {l1_present}/{l1_total} Pflichtangaben vorhanden "
f"({completeness_pct}%)."
)
if completeness_pct < 100:
summary_text += f" Fehlend: {l1_total - l1_present} Angaben nach {label}."
if l2_total > 0:
summary_text += (
f" Detailpruefung: {l2_passed}/{l2_total} bestanden "
f"({correctness_pct}%)."
)
findings.insert(0, {
"code": f"DSI-SCORE-{doc_type.upper()}",
"severity": severity,
"text": summary_text,
"doc_title": doc_title, "doc_url": doc_url, "doc_type": doc_type,
"all_checks": all_checks,
"completeness_pct": completeness_pct,
"correctness_pct": correctness_pct,
})
return findings
def classify_document_type(title: str, url: str) -> str:
"""Classify a document by its title/URL into a legal document type."""
combined = f"{title} {url}".lower()
if any(kw in combined for kw in ["datenschutzfolge", "dsfa", "risikoanalyse für nutzung"]):
return "dsfa"
if any(kw in combined for kw in ["social media", "facebook", "instagram", "linkedin", "fanpage"]):
if any(kw in combined for kw in ["datenschutzerkl", "datenschutz für", "datenschutzinformation"]):
return "social_media"
if any(kw in combined for kw in ["datenschutz", "privacy", "dsgvo", "data protection", "données"]):
return "dse"
if any(kw in combined for kw in ["widerruf", "withdrawal", "rétractation", "desistimiento"]):
return "widerruf"
if any(kw in combined for kw in ["agb", "allgemeine geschäftsbedingungen", "terms",
"nutzungsbedingungen", "conditions"]):
return "agb"
if any(kw in combined for kw in ["cookie", "slapuk", "evästeet", "kakor"]):
return "cookie"
if any(kw in combined for kw in ["impressum", "imprint", "legal notice", "mentions légales"]):
return "impressum"
return "other"