Files
breakpilot-compliance/backend-compliance/compliance/services/doc_checks/runner.py
T
Benjamin Admin 686834cea0
Build + Deploy / build-ai-sdk (push) Failing after 36s
Build + Deploy / build-developer-portal (push) Successful in 8s
Build + Deploy / build-tts (push) Successful in 7s
Build + Deploy / build-document-crawler (push) Successful in 7s
Build + Deploy / build-admin-compliance (push) Successful in 8s
Build + Deploy / build-backend-compliance (push) Successful in 8s
CI / nodejs-build (push) Successful in 3m14s
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / test-go (push) Failing after 46s
CI / test-python-backend (push) Successful in 43s
CI / test-python-document-crawler (push) Successful in 29s
CI / test-python-dsms-gateway (push) Successful in 30s
CI / validate-canonical-controls (push) Successful in 16s
Build + Deploy / build-dsms-gateway (push) Successful in 8s
Build + Deploy / build-dsms-node (push) Successful in 8s
CI / branch-name (push) Has been skipped
Build + Deploy / trigger-orca (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / loc-budget (push) Failing after 17s
CI / secret-scan (push) Has been skipped
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
feat: 4 remaining tasks — EU institutions, banner integration, JS-sites, Caritas fixes
1. EU Institution Checks (Verordnung 2018/1725):
   - New doc_type "eu_institution" with 9 L1 + 15 L2 checks
   - Both German + English patterns (EU institutions are multilingual)
   - Auto-detection via "2018/1725", "EDSB", "EDPS" keywords
   - Correct article references (Art. 15 instead of 13, Art. 5 instead of 6)

2. Banner Check Integration:
   - banner_runner.py maps scan results to 36 L1/L2 structured checks
   - BannerCheckTab shows hierarchical ChecklistView with hints
   - 3-phase summary (cookies/scripts before/after consent)
   - /scan endpoint now includes structured_checks in response

3. JS-heavy Website Fixes (dm, Zalando, HWK):
   - dsi_helpers.py: goto_resilient (networkidle→domcontentloaded fallback)
   - try_dismiss_consent_banner before text extraction
   - PDF redirect detection (dm.de redirects to GCS PDF)

4. Caritas False Positive Fixes:
   - Phone regex allows parentheses: +49 (0)761 → now matches
   - "Recht auf Widerspruch" (3 words) + §23 KDG → matches Art. 21
   - Church authorities: "Katholisches Datenschutzzentrum" recognized

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-08 01:10:10 +02:00

240 lines
9.3 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Document check runner — two-pass L1/L2 logic.
Pass 1: Run all L1 checks ("Is it mentioned?")
Pass 2: Run L2 checks only where their L1 parent passed ("Is it correct?")
"""
import logging
import re
from .dse_checks import ART13_CHECKLIST
from .widerruf_checks import WIDERRUF_CHECKLIST
from .agb_checks import AGB_CHECKLIST
from .impressum_checks import IMPRESSUM_CHECKLIST
from .cookie_checks import COOKIE_CHECKLIST
from .social_media_checks import JOINT_CONTROLLER_CHECKLIST
from .dsfa_checks import DSFA_CHECKLIST
from .eu_institution_checks import EU_INSTITUTION_CHECKLIST
logger = logging.getLogger(__name__)
# Map doc_type strings to (checklist, label)
_CHECKLIST_MAP = {
"dse": (ART13_CHECKLIST, "Art. 13 DSGVO"),
"datenschutz": (ART13_CHECKLIST, "Art. 13 DSGVO"),
"privacy": (ART13_CHECKLIST, "Art. 13 DSGVO"),
"widerruf": (WIDERRUF_CHECKLIST, "§355 BGB"),
"withdrawal": (WIDERRUF_CHECKLIST, "§355 BGB"),
"cancellation": (WIDERRUF_CHECKLIST, "§355 BGB"),
"agb": (AGB_CHECKLIST, "§305ff BGB"),
"terms": (AGB_CHECKLIST, "§305ff BGB"),
"nutzungsbedingungen": (AGB_CHECKLIST, "§305ff BGB"),
"impressum": (IMPRESSUM_CHECKLIST, "§5 TMG / §18 MStV"),
"imprint": (IMPRESSUM_CHECKLIST, "§5 TMG / §18 MStV"),
"cookie": (COOKIE_CHECKLIST, "§25 TDDDG"),
"social_media": (JOINT_CONTROLLER_CHECKLIST, "Art. 26 DSGVO"),
"joint_controller": (JOINT_CONTROLLER_CHECKLIST, "Art. 26 DSGVO"),
"dsfa": (DSFA_CHECKLIST, "Art. 35 DSGVO"),
"eu_institution": (EU_INSTITUTION_CHECKLIST, "VO (EU) 2018/1725"),
}
def _match_patterns(patterns: list[str], text_lower: str):
"""Try each regex pattern against text, return first Match or None."""
for p in patterns:
m = re.search(p, text_lower)
if m:
return m
return None
def _extract_context(text_lower: str, match) -> str:
"""Extract ~30 chars around a match for evidence display."""
if not match:
return ""
start = max(0, match.start() - 30)
end = min(len(text_lower), match.end() + 30)
return text_lower[start:end].strip()
def check_document_completeness(
text: str,
doc_type: str,
doc_title: str,
doc_url: str,
) -> list[dict]:
"""Check a legal document against its type-specific requirements.
Two-pass approach:
L1 — Is the mandatory field mentioned at all?
L2 — Is it correct/complete? (only checked if L1 parent passed)
Returns a list of findings (summary + missing items).
"""
findings = []
# Strip soft hyphens (­ / \xad) that CMS tools insert for word-breaking
# — they break regex matches on compound words like "Datenübertragbarkeit"
text_clean = text.replace("\xad", "").replace("&shy;", "")
text_lower = text_clean.lower()
if not text or len(text) < 50:
findings.append({
"code": f"DSI-EMPTY-{doc_type.upper()}",
"severity": "HIGH",
"text": f"Dokument '{doc_title}' ist leer oder zu kurz fuer eine Pruefung.",
"doc_title": doc_title, "doc_url": doc_url, "doc_type": doc_type,
})
return findings
word_count = len(text.split())
if word_count < 200 and doc_type == "dse":
findings.append({
"code": f"DSI-SCORE-{doc_type.upper()}",
"severity": "LOW",
"text": (
f"'{doc_title}': Kurzhinweis ({word_count} Woerter) — zu kurz fuer "
f"eine vollstaendige Art. 13 DSGVO Pruefung. Kein eigenstaendiges DSI-Dokument."
),
"doc_title": doc_title, "doc_url": doc_url, "doc_type": doc_type,
"all_checks": [],
})
return findings
entry = _CHECKLIST_MAP.get(doc_type, (ART13_CHECKLIST, "Art. 13 DSGVO"))
checklist, label = entry
l1_checks = [c for c in checklist if c.get("level", 1) == 1]
l2_checks = [c for c in checklist if c.get("level", 1) == 2]
# ── Pass 1: L1 checks ────────────────────────────────────────────
passed_l1_ids: set[str] = set()
all_checks: list[dict] = []
l1_present = 0
for check in l1_checks:
match = _match_patterns(check["patterns"], text_lower)
passed = match is not None
if passed:
passed_l1_ids.add(check["id"])
l1_present += 1
else:
findings.append({
"code": f"DSI-MISSING-{check['id'].upper()}",
"severity": check.get("severity", "MEDIUM"),
"text": (
f"'{doc_title}': Pflichtangabe '{check['label']}' nicht gefunden. "
f"Erforderlich nach {label}."
),
"doc_title": doc_title, "doc_url": doc_url,
"doc_type": doc_type, "check_id": check["id"],
})
all_checks.append({
"id": check["id"], "label": check["label"],
"passed": passed, "severity": check.get("severity", "MEDIUM"),
"matched_text": _extract_context(text_lower, match),
"level": 1, "parent": None, "skipped": False,
"hint": check.get("hint", ""),
})
# ── Pass 2: L2 checks (only if parent L1 passed) ─────────────────
l2_total = 0
l2_passed = 0
for check in l2_checks:
parent = check.get("parent")
skipped = parent not in passed_l1_ids
passed = False
matched_text = ""
if not skipped:
l2_total += 1
match = _match_patterns(check["patterns"], text_lower)
passed = match is not None
if passed:
l2_passed += 1
matched_text = _extract_context(text_lower, match)
else:
findings.append({
"code": f"DSI-DETAIL-{check['id'].upper()}",
"severity": check.get("severity", "MEDIUM"),
"text": (
f"'{doc_title}': Detailpruefung '{check['label']}' "
f"nicht bestanden. Empfohlen nach {label}."
),
"doc_title": doc_title, "doc_url": doc_url,
"doc_type": doc_type, "check_id": check["id"],
})
all_checks.append({
"id": check["id"], "label": check["label"],
"passed": passed, "severity": check.get("severity", "MEDIUM"),
"matched_text": matched_text,
"level": 2, "parent": parent, "skipped": skipped,
"hint": check.get("hint", ""),
})
# ── Summary ───────────────────────────────────────────────────────
l1_total = len(l1_checks)
completeness_pct = round(l1_present / l1_total * 100) if l1_total else 0
correctness_pct = round(l2_passed / l2_total * 100) if l2_total else 0
severity = (
"OK" if completeness_pct == 100
else "LOW" if completeness_pct >= 80
else "MEDIUM" if completeness_pct >= 50
else "HIGH"
)
summary_text = (
f"'{doc_title}': {l1_present}/{l1_total} Pflichtangaben vorhanden "
f"({completeness_pct}%)."
)
if completeness_pct < 100:
summary_text += f" Fehlend: {l1_total - l1_present} Angaben nach {label}."
if l2_total > 0:
summary_text += (
f" Detailpruefung: {l2_passed}/{l2_total} bestanden "
f"({correctness_pct}%)."
)
findings.insert(0, {
"code": f"DSI-SCORE-{doc_type.upper()}",
"severity": severity,
"text": summary_text,
"doc_title": doc_title, "doc_url": doc_url, "doc_type": doc_type,
"all_checks": all_checks,
"completeness_pct": completeness_pct,
"correctness_pct": correctness_pct,
})
return findings
def classify_document_type(title: str, url: str) -> str:
"""Classify a document by its title/URL into a legal document type."""
combined = f"{title} {url}".lower()
if any(kw in combined for kw in ["datenschutzfolge", "dsfa", "risikoanalyse für nutzung"]):
return "dsfa"
if any(kw in combined for kw in ["social media", "facebook", "instagram", "linkedin", "fanpage"]):
if any(kw in combined for kw in ["datenschutzerkl", "datenschutz für", "datenschutzinformation"]):
return "social_media"
# EU institution check BEFORE generic privacy — 2018/1725 is more specific
if any(kw in combined for kw in ["2018/1725", "2018 1725", "regulation (eu)",
"verordnung (eu)", "edsb", "edps",
"european data protection supervisor"]):
return "eu_institution"
if any(kw in combined for kw in ["datenschutz", "privacy", "dsgvo", "data protection", "données"]):
return "dse"
if any(kw in combined for kw in ["widerruf", "withdrawal", "rétractation", "desistimiento"]):
return "widerruf"
if any(kw in combined for kw in ["agb", "allgemeine geschäftsbedingungen", "terms",
"nutzungsbedingungen", "conditions"]):
return "agb"
if any(kw in combined for kw in ["cookie", "slapuk", "evästeet", "kakor"]):
return "cookie"
if any(kw in combined for kw in ["impressum", "imprint", "legal notice", "mentions légales"]):
return "impressum"
return "other"