feat(audit): Text-Paste-Mode pro Row — Crawler optional umgehen
CI / detect-changes (push) Successful in 12s
CI / branch-name (push) Has been skipped
CI / nodejs-build (push) Successful in 3m27s
CI / iace-gt-coverage (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 17s
CI / loc-budget (push) Failing after 20s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go (push) Has been skipped
CI / test-python-backend (push) Successful in 47s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped

Hintergrund: VW liefert ueber URL-Crawler nur 6 Vendors statt der 100+
die in der echten Cookie-Tabelle stehen. Wenn der User die Tabelle aber
direkt von der Site kopieren kann (was bei den meisten OEM-Sites moeglich
ist), umgehen wir den Crawler komplett und parsen den Text deterministisch.

Backend:
* doc_type_classifier.py — 7 Pattern-Gruppen (§5 TMG, Art.13 DSGVO,
  AGB-Klauseln, Widerrufs-Frist, Cookie-Tabellen-Header, etc). Wenn der
  User Text ins falsche Doc-Type-Feld kopiert (Impressum->DSE),
  detect_mismatch liefert detected + action ('reclassify' bei sehr hoher
  Konfidenz, 'warn' bei medium).
* cookies_table_parser.py — Tab/Pipe/Komma/Semicolon-Separator-Auto-
  Detection, Spalten-Mapping per Header-Keyword. Aggregiert Cookie-
  Eintraege zu Vendor-Records (mit _guess_vendor-Fallback). Voll
  deterministisch, kein LLM.
* doc_input_warnings.py — Mail-Block ueber dem Audit, der Mismatches +
  Auto-Reclassifies dem User transparent macht.
* Pipeline: text gewinnt ueber url (war schon im Schema vermerkt), neue
  Felder declared_doc_type / input_source / reclassify_hint in doc_entries.
  Pasted-Tabellen-Vendors haben Vorrang vor Library-Fallback + LLM-Cascade
  (sind 100% genau).

Frontend (DocCheckTab):
* Pro Row Mode-Toggle 'URL' / 'Text einfuegen' (lila wenn aktiv).
* Textarea (h-32, monospace) im text-mode mit kontext-spezifischem
  Placeholder (Cookie-Hinweis ggue. anderen Doc-Types) und Live-
  Zeichen-/Wort-Counter.
* Submit-Button accepted entries mit URL ODER text.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-21 18:58:32 +02:00
parent 7335f64f4f
commit e411c4f0d3
5 changed files with 670 additions and 49 deletions
@@ -323,12 +323,25 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
url_text_cache: dict[str, str] = {}
n_docs = max(1, len(req.documents))
# User-pasted-Tabellen-Vendors (kein LLM noetig) — werden weiter
# unten in cmp_vendors gemerged.
pasted_table_vendors: list[dict] = []
for i, doc in enumerate(req.documents):
pct = int(1 + (i / n_docs) * 29)
_update(check_id, f"Texte laden {i+1}/{n_docs}: {doc.doc_type}...", pct)
text = doc.text
text = (doc.text or "").strip()
input_source = "url"
cmp_payloads: list[dict] = []
if not text and doc.url:
if text:
input_source = "text"
if doc.url:
input_source = "text+url" # User hat beide gefuellt
logger.info(
"doc_type=%s: User hat URL UND Text geliefert — "
"Text gewinnt, URL wird als Quellen-Referenz behalten",
doc.doc_type,
)
elif doc.url:
url_key = doc.url.strip().rstrip("/").lower()
if url_key in url_text_cache:
text = url_text_cache[url_key]
@@ -336,16 +349,62 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
text, cmp_payloads = await _fetch_text(doc.url, doc_type=doc.doc_type)
if text:
url_text_cache[url_key] = text
# Auto-Reclassify-Check: wenn der user Text in das falsche
# Doc-Type-Feld kopiert hat (z.B. Impressum-Text in DSE),
# erkennen und ggf. umtaggen.
actual_doc_type = doc.doc_type
reclassify_hint: dict | None = None
if input_source.startswith("text") and len(text) >= 500:
try:
from compliance.services.doc_type_classifier import (
detect_mismatch,
)
reclassify_hint = detect_mismatch(doc.doc_type, text)
if reclassify_hint and reclassify_hint["action"] == "reclassify":
actual_doc_type = reclassify_hint["detected"]
logger.info(
"doc_type AUTO-RECLASSIFY: deklariert=%s "
"erkannt=%s (score %d vs %d) — uebernehme erkannten Typ",
doc.doc_type, actual_doc_type,
reclassify_hint["detected_score"],
reclassify_hint["declared_score"],
)
except Exception as e:
logger.warning("doc_type_classifier failed: %s", e)
# Cookie-Tabelle: wenn User Tabelle reinkopiert hat, deterministisch
# parsen (kein LLM noetig) und Vendors gleich ableiten.
if input_source.startswith("text") and actual_doc_type == "cookie":
try:
from compliance.services.cookies_table_parser import (
parse_cookie_table,
)
tab_vendors = parse_cookie_table(text)
if tab_vendors:
pasted_table_vendors.extend(tab_vendors)
logger.info(
"Cookie-Tabelle erkannt im pasted Text — "
"%d Vendors / %d Cookies deterministisch geparst",
len(tab_vendors),
sum(len(v.get("cookies", [])) for v in tab_vendors),
)
except Exception as e:
logger.warning("cookies_table_parser failed: %s", e)
if text:
doc_texts[doc.doc_type] = text
doc_texts[actual_doc_type] = text
doc_entries.append({
"doc_type": doc.doc_type,
"url": doc.url,
"text": text,
"word_count": len(text.split()) if text else 0,
"auto_discovered": False,
"doc_type": actual_doc_type,
"declared_doc_type": doc.doc_type,
"url": doc.url,
"text": text,
"word_count": len(text.split()) if text else 0,
"auto_discovered": False,
"discovery_attempted": False,
"cmp_payloads": cmp_payloads,
"cmp_payloads": cmp_payloads,
"input_source": input_source,
"reclassify_hint": reclassify_hint,
})
# Step 1a-bis: AUTO-DISCOVERY. For each canonical doc_type the user
@@ -767,6 +826,25 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
logger.info("P57: added %d new vendors from Phase G (total: %d)",
added, len(cmp_vendors))
# User-pasted Cookie-Tabelle (deterministisch, kein LLM):
# die hat IMMER Vorrang weil 100% genau.
if pasted_table_vendors:
existing = {(v.get("name") or "").strip().lower()
for v in cmp_vendors}
added_p = 0
for v in pasted_table_vendors:
nm = (v.get("name") or "").strip()
if not nm or nm.lower() in existing:
continue
cmp_vendors.append(v)
existing.add(nm.lower())
added_p += 1
if added_p:
logger.info(
"Pasted-Tabellen-Merge: +%d Vendors (total: %d)",
added_p, len(cmp_vendors),
)
# Cookie-Library-Fallback (P52 Lite): wenn weiterhin wenige
# Vendors aber viele after_accept-Cookies, aus Library auflösen.
# VW-Lehre: 6 LLM-Grob-Vendors reichen NICHT — die Library
@@ -1250,6 +1328,19 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
except Exception as e:
logger.warning("P82 GF-1-pager skipped: %s", e)
# Doc-Input-Warnings — wenn User Text ins falsche Feld gepastet hat
input_warn_html = ""
try:
from compliance.services.doc_input_warnings import (
collect_warnings, build_warnings_block_html,
)
warns = collect_warnings(doc_entries)
if warns:
input_warn_html = build_warnings_block_html(warns)
logger.info("doc-input-warnings: %d Mismatches gefunden", len(warns))
except Exception as e:
logger.warning("doc-input-warnings skipped: %s", e)
# P86: Branchen-Benchmark (nur wenn scan_context.industry gesetzt)
bench_html = ""
try:
@@ -1293,7 +1384,7 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
logger.warning("P84 diff-mode skipped: %s", e)
full_html = (
gf_one_pager_html + bench_html + diff_html
gf_one_pager_html + input_warn_html + bench_html + diff_html
+ critical_html + scope_disclaimer_html + exec_summary_html
+ cookie_arch_html + summary_html + scanned_html + profile_html
+ scorecard_html + redundancy_html
@@ -0,0 +1,217 @@
"""
Parst Cookie-Tabellen die der User direkt ins Frontend kopiert.
Typische Quellen:
* Browser-Copy aus VW/BMW/Mercedes Cookie-Richtlinie (Tab-getrennt)
* Excel-Export aus Borlabs / OneTrust / Cookiebot Admin (CSV / Pipe)
* Markdown-Tabelle aus interner Doku
Erkennt 4 Spalten-Layouts (heuristisch):
1. [Name, Kategorie, Beschreibung, Speicherdauer, Provider]
2. [Name, Provider, Zweck, Speicherdauer]
3. [Name, Beschreibung, Speicherdauer]
4. nur [Name, Speicherdauer]
Output: gleiche Vendor-Record-Struktur wie vendor_extractor / LLM —
damit der Rest der Pipeline (VVT-Tabelle, Library-Mismatch-Check) ohne
Aenderung weiterlaeuft.
"""
from __future__ import annotations
import logging
import re
logger = logging.getLogger(__name__)
_CATEGORY_LABELS = (
"notwendig", "essential", "funktional", "tracking", "marketing",
"statistik", "analyse", "analytics", "performance", "werbung",
"advertising", "targeting", "preferences", "social_media",
"strictly necessary", "personalisierung",
)
def _looks_like_separator(line: str) -> str | None:
"""Detect the column-separator of a tabular line."""
if "\t" in line and line.count("\t") >= 2:
return "\t"
if " | " in line and line.count(" | ") >= 2:
return " | "
if ";" in line and line.count(";") >= 2 and "," not in line[:20]:
return ";"
if "," in line and line.count(",") >= 3:
return ","
return None
def _normalize_category(s: str) -> str:
sl = s.lower().strip()
for cat in _CATEGORY_LABELS:
if cat in sl:
if cat in ("notwendig", "essential", "strictly necessary"):
return "essential"
if cat in ("tracking", "marketing", "werbung",
"advertising", "targeting"):
return "marketing"
if cat in ("statistik", "analyse", "analytics", "performance"):
return "statistics"
if cat == "funktional":
return "functional"
if cat == "social_media":
return "social_media"
return sl[:30]
def _parse_persistence(s: str) -> str:
"""Extracts 'Speicherdauer' notation."""
m = re.search(
r"(\d+\s*(sekunde|minute|stunde|tag|woche|monat|jahr|day|month|year)[^\s,;|]{0,5})",
s, re.I,
)
if m:
return m.group(1).strip()[:80]
if re.search(r"\bsession\b", s, re.I):
return "Session"
if re.search(r"permanent", s, re.I):
return "Permanent"
return ""
def parse_cookie_table(text: str) -> list[dict]:
"""Returns vendor-records aus einer copy-pasted Cookie-Tabelle.
Bei nicht-tabellarischem Text: return [].
"""
if not text or len(text) < 100:
return []
lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
if not lines:
return []
# Sample 30 lines to detect separator
sample = lines[:60]
sep_counts: dict[str, int] = {}
for ln in sample:
sep = _looks_like_separator(ln)
if sep:
sep_counts[sep] = sep_counts.get(sep, 0) + 1
if not sep_counts or max(sep_counts.values()) < 3:
return []
sep = max(sep_counts, key=sep_counts.get)
logger.info("cookies_table_parser: detected separator '%s' (%d hits)",
sep, sep_counts[sep])
# Parse rows
rows: list[list[str]] = []
for ln in lines:
if sep in ln:
parts = [p.strip().strip('"') for p in ln.split(sep)]
if len(parts) >= 2 and parts[0]:
rows.append(parts)
if len(rows) < 3:
return []
# Detect column layout from header (first row) or by content
header_row = [c.lower() for c in rows[0]]
has_header = any(h in " ".join(header_row) for h in
("cookie", "name", "anbieter", "provider", "zweck",
"kategorie", "speicherdauer", "dauer"))
data_rows = rows[1:] if has_header else rows
# Map columns by header keyword or by position
col_idx = {"name": 0, "provider": -1, "category": -1,
"purpose": -1, "persistence": -1}
if has_header:
for i, h in enumerate(header_row):
if "name" in h or "cookie" in h:
col_idx["name"] = i
elif "anbieter" in h or "provider" in h or "domain" in h:
col_idx["provider"] = i
elif "kategorie" in h or "type" in h or "art" in h:
col_idx["category"] = i
elif "zweck" in h or "purpose" in h or "beschreib" in h:
col_idx["purpose"] = i
elif "speicher" in h or "dauer" in h or "lebens" in h or "expir" in h:
col_idx["persistence"] = i
# Aggregate by vendor (or by name if no vendor column)
by_vendor: dict[str, dict] = {}
for r in data_rows:
if len(r) < 2:
continue
name = r[col_idx["name"]] if col_idx["name"] < len(r) else r[0]
name = (name or "").strip()
if not name or len(name) > 120 or len(name) < 2:
continue
provider = ""
if col_idx["provider"] >= 0 and col_idx["provider"] < len(r):
provider = r[col_idx["provider"]].strip()
if not provider:
# Heuristik: wenn Spalte 'Anbieter' fehlt, raten aus Cookie-Name
provider = _guess_vendor(name)
if not provider:
provider = "Unbekannter Anbieter"
category = ""
purpose = ""
persistence = ""
if col_idx["category"] >= 0 and col_idx["category"] < len(r):
category = _normalize_category(r[col_idx["category"]])
if col_idx["purpose"] >= 0 and col_idx["purpose"] < len(r):
purpose = r[col_idx["purpose"]][:500]
if col_idx["persistence"] >= 0 and col_idx["persistence"] < len(r):
persistence = _parse_persistence(r[col_idx["persistence"]])
if not category:
# Inferieren aus purpose-Text
category = _normalize_category(purpose)
entry = by_vendor.setdefault(provider, {
"name": provider, "country": "",
"purpose": purpose[:300] if purpose else "",
"category": category,
"opt_out_url": "", "privacy_policy_url": "",
"persistence": persistence,
"cookies": [],
"source": "table_paste",
})
entry["cookies"].append({
"name": name, "purpose": purpose[:200],
"expiry": persistence, "is_third_party": True,
})
out = list(by_vendor.values())
logger.info("cookies_table_parser: %d vendors / %d cookies parsed",
len(out), sum(len(v["cookies"]) for v in out))
return out
_VENDOR_GUESS = (
("_ga", "Google"), ("_gid", "Google"), ("_gcl_", "Google"),
("ANID", "Google"), ("AID", "Google"), ("FPGCLDC", "Google"),
("IDE", "Google DoubleClick"), ("DSID", "Google"),
("_fbp", "Meta / Facebook"), ("fr", "Meta / Facebook"),
("_pin_unauth", "Pinterest"), ("_uetsid", "Microsoft Bing"),
("_uetvid", "Microsoft Bing"), ("MUID", "Microsoft"),
("tt_", "TikTok"), ("li_at", "LinkedIn"),
("OptanonConsent", "OneTrust"), ("cookieconsent", "Borlabs / Cookie-CMP"),
("eta_", "etracker"), ("matomo", "Matomo"),
("_hjid", "Hotjar"), ("_hj", "Hotjar"),
("__cf", "Cloudflare"), ("datadome", "DataDome"),
("incap_", "Imperva Incapsula"),
("ajs_", "Segment"), ("amp_", "Amplitude"),
("sat_track", "Adobe Experience Cloud"),
("AMCV_", "Adobe Experience Cloud"),
("s_cc", "Adobe Analytics"), ("s_sq", "Adobe Analytics"),
)
def _guess_vendor(cookie_name: str) -> str:
nl = cookie_name.lower()
for prefix, vendor in _VENDOR_GUESS:
if nl.startswith(prefix.lower()) or prefix.lower() in nl:
return vendor
return ""
@@ -0,0 +1,99 @@
"""
Rendert die Doc-Type-Mismatch-Hinweise als Mail-Block.
Wenn der User Text in das falsche Feld kopiert (z.B. Impressum-Text
ins DSE-Feld), zeigt der Block:
- was er deklariert hat
- was der Classifier erkannt hat
- Empfehlung (re-paste oder als unbekannt einreichen)
"""
from __future__ import annotations
import logging
from typing import Iterable
logger = logging.getLogger(__name__)
_DOC_LABELS = {
"dse": "Datenschutzerklaerung",
"cookie": "Cookie-Richtlinie",
"impressum": "Impressum",
"agb": "AGB",
"widerruf": "Widerrufsbelehrung",
"nutzungsbedingungen": "Nutzungsbedingungen",
"social_media": "Social Media DSE",
"dsfa": "DSFA",
"dsa": "DSA-Pflichtangaben",
"legal_notice": "Rechtliche Hinweise",
"lizenzhinweise": "Lizenzhinweise",
}
def _label(dt: str) -> str:
return _DOC_LABELS.get(dt, dt)
def collect_warnings(doc_entries: Iterable[dict]) -> list[dict]:
"""Returns list of {declared, detected, action, scores} fuer alle
doc_entries mit einem reclassify_hint."""
out: list[dict] = []
for e in (doc_entries or []):
hint = e.get("reclassify_hint")
if not hint:
continue
out.append({
"input_source": e.get("input_source"),
"declared": hint.get("declared"),
"detected": hint.get("detected"),
"action": hint.get("action"),
"declared_score": hint.get("declared_score", 0),
"detected_score": hint.get("detected_score", 0),
"all_scores": hint.get("all_scores") or {},
"word_count": e.get("word_count", 0),
})
return out
def build_warnings_block_html(warnings: list[dict]) -> str:
if not warnings:
return ""
items: list[str] = []
for w in warnings:
action = w.get("action")
if action == "reclassify":
color = "#0e7490"
badge = "AUTO-RECLASSIFIZIERT"
body = (
f'Sie haben den Text als <strong>{_label(w["declared"])}</strong> '
f'eingereicht, das System hat ihn aber automatisch als '
f'<strong>{_label(w["detected"])}</strong> erkannt und entsprechend '
f'gepruft (Konfidenz-Score: {w["detected_score"]} vs '
f'{w["declared_score"]} für die deklarierte Kategorie).'
)
else:
color = "#d97706"
badge = "MOEGLICHER MISMATCH"
body = (
f'Sie haben den Text als <strong>{_label(w["declared"])}</strong> '
f'eingereicht. Der Inhalt enthaelt aber Patterns die eher zu '
f'<strong>{_label(w["detected"])}</strong> passen '
f'({w["detected_score"]} vs {w["declared_score"]}). '
'Bitte pruefen Sie ob Sie den richtigen Doc-Typ ausgewaehlt haben.'
)
items.append(
f'<li style="margin-bottom:8px;font-size:11px;line-height:1.5">'
f'<strong style="color:{color}">[{badge}]</strong> {body}'
f'</li>'
)
return (
'<div style="font-family:-apple-system,BlinkMacSystemFont,sans-serif;'
'max-width:760px;margin:0 auto 12px;padding:10px 14px;'
'background:#ecfeff;border:1px solid #67e8f9;border-radius:6px">'
'<div style="font-size:11px;color:#0e7490;text-transform:uppercase;'
'letter-spacing:1.2px;margin-bottom:4px;font-weight:600">'
'Hinweise zum eingefügten Text</div>'
'<ul style="margin:4px 0 0 18px;padding:0">'
+ "".join(items) +
'</ul></div>'
)
@@ -0,0 +1,162 @@
"""
Erkennt den wahrscheinlichen Doc-Type eines eingefuegten Textes.
Wird genutzt wenn der User Text direkt ins Frontend kopiert. Wenn der
erkannte Typ vom user-deklarierten Typ abweicht, gibt das System einen
Hinweis (oder reklassifiziert automatisch wenn Confidence hoch genug).
Heuristik basiert auf Pflichtangaben-Patterns:
* Impressum: §5 TMG-Bestandteile (Anschrift + Telefon + Email + UStID)
* DSE: Art. 13 DSGVO-Bestandteile (Verantwortlicher + Zweck + Rechtsgrund)
* AGB: Vertragsschluss + Lieferung + Zahlung + Gerichtsstand
* Widerruf: 14-Tage-Frist + Widerrufsformular + Wertersatz
* Cookie-Richtlinie: Cookie-Tabelle / Speicherdauer / Drittanbieter
* Nutzungsbedingungen: Lizenz + Verbot der Vervielfaeltigung + Account
"""
from __future__ import annotations
import logging
import re
logger = logging.getLogger(__name__)
_PATTERNS: dict[str, list[tuple[re.Pattern, int]]] = {
"impressum": [
(re.compile(r"§\s*5\s+TMG", re.I), 4),
(re.compile(r"angaben\s+gem(ä|ae)ß", re.I), 3),
(re.compile(r"\bUSt[\-\s]?ID[\-\s]?Nr\b", re.I), 4),
(re.compile(r"vertretungsberechtigt(e|er)", re.I), 3),
(re.compile(r"registergericht", re.I), 3),
(re.compile(r"handelsregister(nummer)?", re.I), 3),
(re.compile(r"\bHRB\s+\d+", re.I), 3),
(re.compile(r"verantwortlich\s+f(ü|ue)r\s+den\s+inhalt", re.I), 3),
(re.compile(r"\bRStV\b|Rundfunkstaatsvertrag", re.I), 3),
(re.compile(r"streitschlichtung", re.I), 2),
(re.compile(r"OS[\-\s]?plattform", re.I), 2),
],
"dse": [
(re.compile(r"art(ikel)?\.?\s*13\s+DSGVO", re.I), 5),
(re.compile(r"art(ikel)?\.?\s*15\s+DSGVO", re.I), 4),
(re.compile(r"rechtsgrundlage", re.I), 3),
(re.compile(r"datenschutzbeauftragt", re.I), 4),
(re.compile(r"berechtigtes\s+interesse", re.I), 3),
(re.compile(r"betroffenenrechte", re.I), 3),
(re.compile(r"aufsichtsbeh(ö|oe)rde", re.I), 3),
(re.compile(r"speicherdauer|aufbewahrungsfrist", re.I), 2),
(re.compile(r"datenkategorie", re.I), 2),
(re.compile(r"verantwortliche(r|n)\s+im\s+sinne", re.I), 4),
],
"agb": [
(re.compile(r"allgemeine\s+gesch(ä|ae)ftsbedingungen", re.I), 5),
(re.compile(r"\bAGB\b", re.I), 3),
(re.compile(r"vertragsschluss|vertragsabschluss", re.I), 3),
(re.compile(r"liefer(bedingungen|zeit|kosten)", re.I), 2),
(re.compile(r"gew(ä|ae)hrleistung", re.I), 2),
(re.compile(r"haftungsausschluss", re.I), 2),
(re.compile(r"gerichtsstand", re.I), 3),
(re.compile(r"anwendbares\s+recht", re.I), 2),
(re.compile(r"salvatorische\s+klausel", re.I), 2),
],
"widerruf": [
(re.compile(r"widerrufsbelehrung", re.I), 5),
(re.compile(r"14\s+tage", re.I), 3),
(re.compile(r"widerrufsrecht", re.I), 4),
(re.compile(r"widerrufsformular", re.I), 3),
(re.compile(r"wertersatz", re.I), 3),
(re.compile(r"r(ü|ue)cksende(kosten|gebuehr)", re.I), 3),
(re.compile(r"muster[\-\s]?widerrufsformular", re.I), 4),
],
"cookie": [
(re.compile(r"cookie[\-\s]?richtlinie", re.I), 4),
(re.compile(r"cookie[\-\s]?policy", re.I), 4),
(re.compile(r"tracking[\-\s]?cookies?", re.I), 3),
(re.compile(r"funktionale\s+cookies?", re.I), 3),
(re.compile(r"marketing[\-\s]?cookies?", re.I), 3),
(re.compile(r"speicherdauer\s*\d+\s*(tag|monat|jahr)", re.I), 3),
(re.compile(r"drittanbieter[\-\s]?cookies?", re.I), 3),
(re.compile(r"\b(IDE|_ga|_gid|_fbp|_gcl_au|OptanonConsent)\b"), 3),
(re.compile(r"opt[\-\s]?out", re.I), 2),
],
"nutzungsbedingungen": [
(re.compile(r"nutzungsbedingungen", re.I), 5),
(re.compile(r"terms\s+of\s+(use|service)", re.I), 4),
(re.compile(r"benutzerkonto|nutzerkonto", re.I), 3),
(re.compile(r"untersagt|unzul(ä|ae)ssig.{0,30}nutzung", re.I), 2),
(re.compile(r"sperrung\s+des\s+kontos", re.I), 2),
],
"social_media": [
(re.compile(r"social[\-\s]?media[\-\s]?(plug[\-\s]?ins?|kanale|kanaele|pr(ä|ae)senz)", re.I), 4),
(re.compile(r"gemeinsam\s+verantwortlich.{0,100}(facebook|meta|instagram)", re.I), 4),
(re.compile(r"fanpage|fan[\-\s]?page", re.I), 3),
(re.compile(r"like[\-\s]?button|share[\-\s]?button", re.I), 2),
],
}
def classify(text: str, top_n: int = 3) -> list[tuple[str, int]]:
"""Returns list of (doc_type, score) sorted by score desc.
Score >= 6 = high confidence, 3-5 = medium, < 3 = low.
"""
if not text or len(text) < 200:
return []
scores: dict[str, int] = {}
for dt, pats in _PATTERNS.items():
s = 0
for pat, weight in pats:
if pat.search(text):
s += weight
if s > 0:
scores[dt] = s
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return ranked[:top_n]
def best_match(text: str) -> tuple[str, int] | None:
"""Returns (doc_type, score) of best match or None."""
ranked = classify(text, top_n=1)
return ranked[0] if ranked else None
def detect_mismatch(
declared_doc_type: str,
text: str,
min_confidence: int = 6,
) -> dict | None:
"""If the text scores higher for a different doc_type than declared,
return a hint dict {detected, declared, scores, action}.
action='reclassify' if confidence is very high (>= min_confidence * 1.5)
action='warn' if medium (>= min_confidence)
action=None / no return otherwise.
"""
ranked = classify(text, top_n=3)
if not ranked:
return None
detected, detected_score = ranked[0]
declared_canon = (declared_doc_type or "").lower().strip()
# Aliase normalisieren
alias = {"datenschutz": "dse", "privacy": "dse",
"terms": "nutzungsbedingungen",
"terms_of_use": "nutzungsbedingungen"}
declared_canon = alias.get(declared_canon, declared_canon)
if detected == declared_canon:
return None
if detected_score < min_confidence:
return None
declared_score = next((s for dt, s in ranked if dt == declared_canon), 0)
# Nur wenn detected DEUTLICH besser ist (Faktor >= 2 oder declared = 0)
if declared_score and detected_score < declared_score * 2:
return None
action = "reclassify" if detected_score >= min_confidence * 1.5 else "warn"
return {
"detected": detected,
"declared": declared_doc_type,
"detected_score": detected_score,
"declared_score": declared_score,
"action": action,
"all_scores": dict(ranked),
}