Files
breakpilot-compliance/backend-compliance/compliance/services/doc_checks/runner.py
T
Benjamin Admin 0d37822b7c
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / detect-changes (push) Successful in 10s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 16s
CI / loc-budget (push) Failing after 16s
CI / go-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 37s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
fix(impressum): P9 — 7 False-Positive-Fixes in Pflichtangaben-Checks
#1 Name des Anbieters: \b Word-Boundary verhindert "ag" in "samstag",
   plus "aktiengesellschaft" als Volltreffer.
#2 Vertretungsberechtigte: Klammer-Liste-Pattern erkennt jetzt BMW-
   Format "Vorstand (Milan Nedeljkovic, Jochen Goller, ...)" plus
   "Vorsitzender des Aufsichtsrats: Name".
#3 V.i.S.d.P.: war schon INFO, OK.
#4 OS-Plattform/VSBG: bei no_direct_sales=True (OEM-Pattern) jetzt als
   "Nicht anwendbar" skipped statt 0/1 fail. Profile fliesst neu durch
   check_document_completeness -> runner.
#5 Zustaendige Kammer: IHK + Handwerkskammer + Tieraerztekammer in
   Pattern aufgenommen + severity LOW -> INFO (konditional).
#6 Stammkapital: war schon INFO, OK.
#7 Link-Disclaimer: neue Check-Eigenschaft "invert"=True. Anti-Pattern
   ist passed wenn NICHT gefunden, fail wenn gefunden. Vorher feuerte
   das Finding immer, jetzt nur wenn ein illegaler Disclaimer im Text
   ist.

Plus: L2-INFO-Checks (z.B. profession_chamber) zaehlen nicht mehr in
correctness-pct und erzeugen keine DSI-DETAIL-Findings. Konsistent
mit P8-Modell: INFO = "selbst pruefen", nicht "fail".

Verifiziert mit BMW-Impressum-Text — alle 7 Faelle korrekt klassifiziert:
  name=passed, representative_person=passed, profession_chamber=INFO,
  illegal_disclaimer=passed (kein Disclaimer im Text),
  dispute_resolution=skipped (no_direct_sales),
  editorial_visdp=INFO, share_capital=INFO.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 00:52:03 +02:00

304 lines
13 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Document check runner — two-pass L1/L2 logic.
Pass 1: Run all L1 checks ("Is it mentioned?")
Pass 2: Run L2 checks only where their L1 parent passed ("Is it correct?")
"""
from __future__ import annotations
import logging
import re
from .dse_checks import ART13_CHECKLIST
from .widerruf_checks import WIDERRUF_CHECKLIST
from .agb_checks import AGB_CHECKLIST
from .impressum_checks import IMPRESSUM_CHECKLIST
from .cookie_checks import COOKIE_CHECKLIST
from .social_media_checks import JOINT_CONTROLLER_CHECKLIST
from .dsfa_checks import DSFA_CHECKLIST
from .eu_institution_checks import EU_INSTITUTION_CHECKLIST
from .avv_checks import AVV_CHECKLIST
from .scc_checks import SCC_CHECKLIST
from .tom_annex_checks import TOM_ANNEX_CHECKLIST
from .sub_processor_checks import SUB_PROCESSOR_LIST_CHECKLIST
from .loeschkonzept_checks import LOESCHKONZEPT_CHECKLIST
logger = logging.getLogger(__name__)
# Map doc_type strings to (checklist, label)
_CHECKLIST_MAP = {
"dse": (ART13_CHECKLIST, "Art. 13 DSGVO"),
"datenschutz": (ART13_CHECKLIST, "Art. 13 DSGVO"),
"privacy": (ART13_CHECKLIST, "Art. 13 DSGVO"),
"widerruf": (WIDERRUF_CHECKLIST, "§355 BGB"),
"withdrawal": (WIDERRUF_CHECKLIST, "§355 BGB"),
"cancellation": (WIDERRUF_CHECKLIST, "§355 BGB"),
"agb": (AGB_CHECKLIST, "§305ff BGB"),
"terms": (AGB_CHECKLIST, "§305ff BGB"),
"nutzungsbedingungen": (AGB_CHECKLIST, "§305ff BGB"),
"impressum": (IMPRESSUM_CHECKLIST, "§5 TMG / §18 MStV"),
"imprint": (IMPRESSUM_CHECKLIST, "§5 TMG / §18 MStV"),
"cookie": (COOKIE_CHECKLIST, "§25 TDDDG"),
"social_media": (JOINT_CONTROLLER_CHECKLIST, "Art. 26 DSGVO"),
"joint_controller": (JOINT_CONTROLLER_CHECKLIST, "Art. 26 DSGVO"),
"dsfa": (DSFA_CHECKLIST, "Art. 35 DSGVO"),
"eu_institution": (EU_INSTITUTION_CHECKLIST, "VO (EU) 2018/1725"),
"avv": (AVV_CHECKLIST, "Art. 28 DSGVO"),
"auftragsverarbeitung": (AVV_CHECKLIST, "Art. 28 DSGVO"),
"dpa": (AVV_CHECKLIST, "Art. 28 DSGVO"),
"scc": (SCC_CHECKLIST, "EU SCC 2021"),
"standardvertragsklauseln": (SCC_CHECKLIST, "EU SCC 2021"),
"tom_annex": (TOM_ANNEX_CHECKLIST, "Art. 32 DSGVO"),
"tom_anlage": (TOM_ANNEX_CHECKLIST, "Art. 32 DSGVO"),
"tom": (TOM_ANNEX_CHECKLIST, "Art. 32 DSGVO"),
"sub_processor_list": (SUB_PROCESSOR_LIST_CHECKLIST, "Art. 28(3)(d) DSGVO"),
"sub_processor": (SUB_PROCESSOR_LIST_CHECKLIST, "Art. 28(3)(d) DSGVO"),
"unterauftragnehmer": (SUB_PROCESSOR_LIST_CHECKLIST, "Art. 28(3)(d) DSGVO"),
"loeschkonzept": (LOESCHKONZEPT_CHECKLIST, "Art. 5(1)(e) DSGVO / DIN 66398"),
"loeschung": (LOESCHKONZEPT_CHECKLIST, "Art. 5(1)(e) DSGVO / DIN 66398"),
"loeschfristen": (LOESCHKONZEPT_CHECKLIST, "Art. 5(1)(e) DSGVO / DIN 66398"),
"deletion_concept": (LOESCHKONZEPT_CHECKLIST, "Art. 5(1)(e) DSGVO / DIN 66398"),
}
def _match_patterns(patterns: list[str], text_lower: str):
"""Try each regex pattern against text, return first Match or None."""
for p in patterns:
m = re.search(p, text_lower)
if m:
return m
return None
def _extract_context(text_lower: str, match) -> str:
"""Extract ~30 chars around a match for evidence display."""
if not match:
return ""
start = max(0, match.start() - 30)
end = min(len(text_lower), match.end() + 30)
return text_lower[start:end].strip()
def check_document_completeness(
text: str,
doc_type: str,
doc_title: str,
doc_url: str,
business_profile: dict | None = None,
) -> list[dict]:
"""Check a legal document against its type-specific requirements.
Two-pass approach:
L1 — Is the mandatory field mentioned at all?
L2 — Is it correct/complete? (only checked if L1 parent passed)
business_profile (optional) wird genutzt um Checks die fuer das
spezifische Unternehmen nicht anwendbar sind als 'skipped' zu
markieren (z.B. OS-Plattform/VSBG bei no_direct_sales=True).
Returns a list of findings (summary + missing items).
"""
findings = []
no_direct_sales = bool((business_profile or {}).get("no_direct_sales"))
# P9: Welche Check-IDs sind bei OEM-Konfigurator-Pattern obsolet.
skip_check_ids: set[str] = set()
if no_direct_sales:
skip_check_ids.update([
"dispute_resolution", # OS-Plattform / VSBG nur B2C-Direkthaendler
])
# Strip soft hyphens (­ / \xad) that CMS tools insert for word-breaking
# — they break regex matches on compound words like "Datenübertragbarkeit"
text_clean = text.replace("\xad", "").replace("&shy;", "")
text_lower = text_clean.lower()
if not text or len(text) < 50:
findings.append({
"code": f"DSI-EMPTY-{doc_type.upper()}",
"severity": "HIGH",
"text": f"Dokument '{doc_title}' ist leer oder zu kurz fuer eine Pruefung.",
"doc_title": doc_title, "doc_url": doc_url, "doc_type": doc_type,
})
return findings
word_count = len(text.split())
if word_count < 200 and doc_type == "dse":
findings.append({
"code": f"DSI-SCORE-{doc_type.upper()}",
"severity": "LOW",
"text": (
f"'{doc_title}': Kurzhinweis ({word_count} Woerter) — zu kurz fuer "
f"eine vollstaendige Art. 13 DSGVO Pruefung. Kein eigenstaendiges DSI-Dokument."
),
"doc_title": doc_title, "doc_url": doc_url, "doc_type": doc_type,
"all_checks": [],
})
return findings
entry = _CHECKLIST_MAP.get(doc_type, (ART13_CHECKLIST, "Art. 13 DSGVO"))
checklist, label = entry
l1_checks = [c for c in checklist if c.get("level", 1) == 1]
l2_checks = [c for c in checklist if c.get("level", 1) == 2]
# ── Pass 1: L1 checks ────────────────────────────────────────────
passed_l1_ids: set[str] = set()
all_checks: list[dict] = []
l1_present = 0
l1_scoreable = 0 # Exclude INFO checks from score
for check in l1_checks:
is_info = check.get("severity") == "INFO"
# P9: Profil-basiertes Skip (OEM-Pattern -> OS-Plattform raus)
if check["id"] in skip_check_ids:
all_checks.append({
"id": check["id"], "label": check["label"],
"passed": False, "severity": "INFO",
"matched_text": "", "level": 1, "parent": None,
"skipped": True,
"hint": "Nicht anwendbar: Unternehmen betreibt keinen "
"Direkt-Vertrieb an Verbraucher (OEM-Konfigurator-Pattern).",
})
continue
match = _match_patterns(check["patterns"], text_lower)
# P9: "invert"=True bedeutet Anti-Pattern (z.B. illegaler Link-
# Disclaimer): passed wenn NICHT gefunden, fail wenn gefunden.
if check.get("invert"):
passed = match is None
match = None if passed else match
else:
passed = match is not None
if passed:
passed_l1_ids.add(check["id"])
if not is_info:
l1_present += 1
if not is_info:
l1_scoreable += 1
if not passed and not is_info:
findings.append({
"code": f"DSI-MISSING-{check['id'].upper()}",
"severity": check.get("severity", "MEDIUM"),
"text": (
f"'{doc_title}': Pflichtangabe '{check['label']}' nicht gefunden. "
f"Erforderlich nach {label}."
),
"doc_title": doc_title, "doc_url": doc_url,
"doc_type": doc_type, "check_id": check["id"],
})
all_checks.append({
"id": check["id"], "label": check["label"],
"passed": passed, "severity": check.get("severity", "MEDIUM"),
"matched_text": _extract_context(text_lower, match),
"level": 1, "parent": None, "skipped": False,
"hint": check.get("hint", ""),
})
# ── Pass 2: L2 checks (only if parent L1 passed) ─────────────────
l2_total = 0
l2_passed = 0
for check in l2_checks:
parent = check.get("parent")
is_info = check.get("severity") == "INFO"
skipped = parent not in passed_l1_ids
passed = False
matched_text = ""
if not skipped:
match = _match_patterns(check["patterns"], text_lower)
passed = match is not None
# P9: INFO-L2-Checks (konditional, z.B. Kammer) zaehlen NICHT
# in correctness-pct und erscheinen nicht als Fail-Finding.
if is_info:
if passed:
matched_text = _extract_context(text_lower, match)
# weder l2_total++ noch findings.append: kein Fail-Eintrag
else:
l2_total += 1
if passed and not is_info:
l2_passed += 1
matched_text = _extract_context(text_lower, match)
elif not passed and not is_info:
findings.append({
"code": f"DSI-DETAIL-{check['id'].upper()}",
"severity": check.get("severity", "MEDIUM"),
"text": (
f"'{doc_title}': Detailpruefung '{check['label']}' "
f"nicht bestanden. Empfohlen nach {label}."
),
"doc_title": doc_title, "doc_url": doc_url,
"doc_type": doc_type, "check_id": check["id"],
})
all_checks.append({
"id": check["id"], "label": check["label"],
"passed": passed, "severity": check.get("severity", "MEDIUM"),
"matched_text": matched_text,
"level": 2, "parent": parent, "skipped": skipped,
"hint": check.get("hint", ""),
})
# ── Summary ───────────────────────────────────────────────────────
l1_total = l1_scoreable # Exclude INFO checks from percentage
completeness_pct = round(l1_present / l1_total * 100) if l1_total else 0
correctness_pct = round(l2_passed / l2_total * 100) if l2_total else 0
severity = (
"OK" if completeness_pct == 100
else "LOW" if completeness_pct >= 80
else "MEDIUM" if completeness_pct >= 50
else "HIGH"
)
summary_text = (
f"'{doc_title}': {l1_present}/{l1_total} Pflichtangaben vorhanden "
f"({completeness_pct}%)."
)
if completeness_pct < 100:
summary_text += f" Fehlend: {l1_total - l1_present} Angaben nach {label}."
if l2_total > 0:
summary_text += (
f" Detailpruefung: {l2_passed}/{l2_total} bestanden "
f"({correctness_pct}%)."
)
findings.insert(0, {
"code": f"DSI-SCORE-{doc_type.upper()}",
"severity": severity,
"text": summary_text,
"doc_title": doc_title, "doc_url": doc_url, "doc_type": doc_type,
"all_checks": all_checks,
"completeness_pct": completeness_pct,
"correctness_pct": correctness_pct,
})
return findings
def classify_document_type(title: str, url: str) -> str:
"""Classify a document by its title/URL into a legal document type."""
combined = f"{title} {url}".lower()
if any(kw in combined for kw in ["datenschutzfolge", "dsfa", "risikoanalyse für nutzung"]):
return "dsfa"
if any(kw in combined for kw in ["social media", "facebook", "instagram", "linkedin", "fanpage"]):
if any(kw in combined for kw in ["datenschutzerkl", "datenschutz für", "datenschutzinformation"]):
return "social_media"
# EU institution check BEFORE generic privacy — 2018/1725 is more specific
if any(kw in combined for kw in ["2018/1725", "2018 1725", "regulation (eu)",
"verordnung (eu)", "edsb", "edps",
"european data protection supervisor"]):
return "eu_institution"
if any(kw in combined for kw in ["datenschutz", "privacy", "dsgvo", "data protection", "données"]):
return "dse"
if any(kw in combined for kw in ["widerruf", "withdrawal", "rétractation", "desistimiento"]):
return "widerruf"
if any(kw in combined for kw in ["agb", "allgemeine geschäftsbedingungen", "terms",
"nutzungsbedingungen", "conditions"]):
return "agb"
if any(kw in combined for kw in ["cookie", "slapuk", "evästeet", "kakor"]):
return "cookie"
if any(kw in combined for kw in ["impressum", "imprint", "legal notice", "mentions légales"]):
return "impressum"
return "other"