57c0f940a2
CI / detect-changes (push) Successful in 11s
CI / branch-name (push) Has been skipped
CI / nodejs-build (push) Successful in 2m19s
CI / test-go (push) Has been skipped
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 16s
CI / loc-budget (push) Failing after 15s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 37s
P56 Anti-Auditing-Detection als constructive Compliance-Finding (Audit-API-
Empfehlung statt Anklage, weil Mercedes berechtigt Bots blockiert)
P57 Phase G vendor_details Union mit cmp_vendors -> 42 Anbieter sichtbar
P58 Anti-Audit-Detection robuster (Script-Domain-Check + Settings-spezifisch)
P59 Cookie-Behavior-Validator (4 Layer, 3-Tier-Severity: MEDIUM=Kategorie-
Mismatch / HIGH=Zweck-Mismatch / CRITICAL=beide=Vorsatz-Indiz)
+ Open Cookie Database (CC0) als Library-Seed (2264 Cookies)
P59b Cookie-Behavior in Banner-Check verdrahtet + Mail-Block (BUGFIX:
SessionLocal selbst oeffnen, db war im Background-Task nicht im Scope)
Mail-Polish nach Mercedes-Review:
P63 Banner-Footer-Links auch im wb7-link/role=link erkennen (Shadow-DOM-
Walker label-based statt nur <a href>)
P64 Re-Access-Severity: MEDIUM statt HIGH, wenn Footer "Einstellungen" oder
Mercedes-typisch existiert; OEM-Footer-Detection (wb7-footer)
P65 Text-Truncation: Word-Boundary statt Zeichen-Cut (kein "einfa"-Bruch
mehr in Sofortmassnahmen)
P66 GF-Aktionen: Service-Zweck vs Cookie-Zweck explizit erklaert
(haeufige Verwechslung Marketing/GF: "Akamai-Beschreibung" != Cookie-
Zweck pro DSK-OH 2024)
P67 Stirring-Finding mit "Verlust-Framing"-Erklaerung + Alt-vs-Neutral-
Beispiel, statt nur EDPB-Fachbegriff
Compliance-Advisor FAQ (admin agent-core/soul):
+ CNIL/EDPB Top-Bussgelder (Google 100M, Meta 60M, Amazon 35M)
+ Deutsche Praezedenz (LG Muenchen Google Fonts, EuGH Planet49, BGH I ZR 7/16)
+ 4 Risiko-Pfade (Bussgeld/Abmahnung/Sammelklage/NOYB) + Berechnungs-Methodik
Document-Generator Templates: AGB-DE (142), Impressum (140), Widerrufs-
formular-Anlage (143), DSR-Process-Dedup (139), Cookie-Library (144).
Architektur: doc_action_mappings.py + banner_dom_walkers.py +
cookie_behavior_validator.py + vendor_detail_extractor.py rausgezogen,
um die 500-LOC-Caps in agent_doc_check_report.py und
banner_text_checker.py einzuhalten.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
291 lines
11 KiB
Python
291 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
P39 — Template-Audit: prueft alle Legal-Templates aus der DB gegen
|
|
unsere eigenen Pflichtangaben-Checks (doc_checks/*).
|
|
|
|
Verwendet check_document_completeness — die gleiche Funktion die auch
|
|
externe Sites pruefen wuerde. Reports als Markdown.
|
|
|
|
Run inside the bp-compliance-backend container:
|
|
docker exec bp-compliance-backend python /app/scripts/audit_template_completeness.py
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from collections import defaultdict
|
|
from datetime import datetime, timezone
|
|
from typing import Iterable
|
|
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
|
|
# Add compliance package to path if running outside container
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from compliance.services.doc_checks.runner import check_document_completeness # noqa: E402
|
|
|
|
# template_type (DB) -> doc_type (checker) — only those for which we
|
|
# have a checklist. Others fall back to LLM-only and skip.
|
|
TEMPLATE_TO_DOCTYPE = {
|
|
"privacy_policy": "dse",
|
|
"data_protection_policy": "dse",
|
|
"applicant_dsi": "dse",
|
|
"employee_dsi": "dse",
|
|
"social_media_dsi": "dse",
|
|
"video_conference_dsi": "dse",
|
|
"informationspflichten": "dse",
|
|
"cookie_policy": "cookie",
|
|
"agb": "agb",
|
|
"widerruf": "widerruf",
|
|
"dpa": "avv",
|
|
"dsfa": "dsfa",
|
|
"tom_documentation": "tom_annex",
|
|
"loeschkonzept": "loeschkonzept",
|
|
}
|
|
|
|
# Demo replacements for common placeholders so the template has plausible
|
|
# concrete values instead of generic {{X}} markers (which would all fail
|
|
# regex-based mandatory-field checks).
|
|
DEMO_PLACEHOLDERS: dict[str, str] = {
|
|
"company_name": "Demo GmbH",
|
|
"company_legal_name": "Demo GmbH",
|
|
"company_address": "Musterstraße 1, 12345 Berlin",
|
|
"company_city": "Berlin",
|
|
"company_postal": "12345",
|
|
"company_country": "Deutschland",
|
|
"company_email": "datenschutz@demo.de",
|
|
"company_phone": "+49 30 12345678",
|
|
"dpo_name": "Max Mustermann",
|
|
"dpo_email": "dsb@demo.de",
|
|
"dpo_phone": "+49 30 87654321",
|
|
"managing_director": "Erika Mustermann",
|
|
"register_court": "Amtsgericht Berlin",
|
|
"register_number": "HRB 123456",
|
|
"vat_id": "DE123456789",
|
|
"supervisory_authority": "Berliner Beauftragte für Datenschutz",
|
|
"supervisory_address": "Friedrichstr. 219, 10969 Berlin",
|
|
"retention_period": "10 Jahre nach Vertragsende",
|
|
"third_country": "USA",
|
|
"transfer_mechanism": "EU-Standardvertragsklauseln",
|
|
"date": "2026-05-20",
|
|
"version": "1.0",
|
|
}
|
|
|
|
|
|
def render_placeholders(content: str) -> str:
|
|
"""Replace {{key}} placeholders with demo values. Unknown placeholders
|
|
are stripped to empty string so the regex checks see plausible text."""
|
|
def repl(m: re.Match) -> str:
|
|
key = m.group(1).strip().lower()
|
|
# Hyphens / underscores normalised
|
|
key_norm = key.replace("-", "_")
|
|
if key_norm in DEMO_PLACEHOLDERS:
|
|
return DEMO_PLACEHOLDERS[key_norm]
|
|
return f"[{key}]" # leave hint for context but don't break sentences
|
|
# Match {{anything}} including dots and brackets used in conditional blocks
|
|
return re.sub(r"\{\{\s*([^{}]+?)\s*\}\}", repl, content)
|
|
|
|
|
|
def strip_handlebars_blocks(content: str) -> str:
|
|
"""Drop {{#IF X}}...{{/IF}} markers but keep inner content (audit
|
|
only cares whether mandatory text appears anywhere, not which branch
|
|
is active)."""
|
|
# Remove block markers but keep enclosed content
|
|
content = re.sub(r"\{\{#IF[^}]*\}\}", "", content)
|
|
content = re.sub(r"\{\{/IF\}\}", "", content)
|
|
content = re.sub(r"\{\{#UNLESS[^}]*\}\}", "", content)
|
|
content = re.sub(r"\{\{/UNLESS\}\}", "", content)
|
|
content = re.sub(r"\{\{else\}\}", "", content)
|
|
return content
|
|
|
|
|
|
def fetch_templates(conn) -> list[dict]:
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
cur.execute("""
|
|
SELECT id, document_type, language, title, content
|
|
FROM compliance.compliance_legal_templates
|
|
WHERE status = 'published'
|
|
ORDER BY document_type, language
|
|
""")
|
|
return list(cur.fetchall())
|
|
|
|
|
|
def audit_template(tpl: dict) -> dict:
|
|
"""Audit a single template — returns dict with findings + summary."""
|
|
doc_type = TEMPLATE_TO_DOCTYPE.get(tpl["document_type"])
|
|
if not doc_type:
|
|
return {
|
|
"template_id": tpl["id"],
|
|
"template_type": tpl["document_type"],
|
|
"language": tpl["language"],
|
|
"title": tpl["title"],
|
|
"doc_type": None,
|
|
"skipped_reason": "no_checklist_mapping",
|
|
"l1_total": 0, "l1_passed": 0, "l1_missing": [],
|
|
}
|
|
raw = tpl["content"] or ""
|
|
rendered = strip_handlebars_blocks(raw)
|
|
rendered = render_placeholders(rendered)
|
|
findings = check_document_completeness(
|
|
text=rendered,
|
|
doc_type=doc_type,
|
|
doc_title=tpl["title"] or tpl["document_type"],
|
|
doc_url=f"template://{tpl['id']}",
|
|
)
|
|
# findings is a list of dicts; the first finding usually has 'all_checks'
|
|
all_checks: list[dict] = []
|
|
for f in findings:
|
|
if "all_checks" in f and f["all_checks"]:
|
|
all_checks = f["all_checks"]
|
|
break
|
|
l1_checks = [c for c in all_checks if c.get("level", 1) == 1]
|
|
l1_missing = [c for c in l1_checks if not c.get("passed") and not c.get("skipped")]
|
|
return {
|
|
"template_id": tpl["id"],
|
|
"template_type": tpl["document_type"],
|
|
"language": tpl["language"],
|
|
"title": tpl["title"],
|
|
"doc_type": doc_type,
|
|
"l1_total": len(l1_checks),
|
|
"l1_passed": sum(1 for c in l1_checks if c.get("passed") and not c.get("skipped")),
|
|
"l1_missing": [
|
|
{"id": c.get("id"), "label": c.get("label"), "hint": c.get("hint", "")[:200]}
|
|
for c in l1_missing
|
|
],
|
|
"word_count": len(rendered.split()),
|
|
}
|
|
|
|
|
|
def render_markdown_report(results: Iterable[dict]) -> str:
|
|
results = list(results)
|
|
audited = [r for r in results if r.get("doc_type")]
|
|
skipped = [r for r in results if not r.get("doc_type")]
|
|
by_type = defaultdict(list)
|
|
for r in audited:
|
|
by_type[r["template_type"]].append(r)
|
|
|
|
lines = []
|
|
lines.append(f"# Template-Audit (P39)")
|
|
lines.append("")
|
|
lines.append(f"**Datum:** {datetime.now(timezone.utc).isoformat()}")
|
|
lines.append(f"**Methode:** check_document_completeness gegen jede Vorlage")
|
|
lines.append("")
|
|
lines.append(f"- Templates gesamt: {len(results)}")
|
|
lines.append(f"- Auditierbar (mit Checklist-Mapping): {len(audited)}")
|
|
lines.append(f"- Uebersprungen (kein doc_type-Mapping): {len(skipped)}")
|
|
lines.append("")
|
|
|
|
# Summary table by template_type
|
|
lines.append("## Zusammenfassung pro Template-Typ")
|
|
lines.append("")
|
|
lines.append("| Template-Type | Sprache | L1-Score | Fehlende Pflichtangaben |")
|
|
lines.append("|---|---|---|---|")
|
|
for tpl_type in sorted(by_type):
|
|
for r in by_type[tpl_type]:
|
|
ratio = f"{r['l1_passed']}/{r['l1_total']}" if r["l1_total"] else "—"
|
|
missing_count = len(r["l1_missing"])
|
|
lines.append(
|
|
f"| `{tpl_type}` | {r['language']} | {ratio} | "
|
|
f"{missing_count} fehlt" + ("e" if missing_count != 1 else "")
|
|
+ (f": {', '.join(c['label'] for c in r['l1_missing'])}" if r['l1_missing'] else "")
|
|
+ " |"
|
|
)
|
|
lines.append("")
|
|
|
|
# Per-template details — only those with failures
|
|
failed = [r for r in audited if r["l1_missing"]]
|
|
lines.append(f"## Details: {len(failed)} Templates mit fehlenden Pflichtangaben")
|
|
lines.append("")
|
|
for r in failed:
|
|
lines.append(f"### {r['template_type']} ({r['language']}) — {r['title']}")
|
|
lines.append("")
|
|
lines.append(f"- Doc-Type: `{r['doc_type']}`")
|
|
lines.append(f"- Wortzahl: {r['word_count']}")
|
|
lines.append(f"- L1-Score: {r['l1_passed']}/{r['l1_total']}")
|
|
lines.append(f"- Fehlend ({len(r['l1_missing'])}):")
|
|
for c in r["l1_missing"]:
|
|
lines.append(f" - **{c['label']}** (`{c['id']}`)")
|
|
if c.get("hint"):
|
|
lines.append(f" - Hinweis: {c['hint']}")
|
|
lines.append("")
|
|
|
|
# Templates without checklist
|
|
if skipped:
|
|
lines.append("## Templates ohne automatische Pflichtangaben-Pruefung")
|
|
lines.append("")
|
|
lines.append("Diese Templates haben keinen Doc-Check-Mapping — sie werden "
|
|
"nicht automatisch gepruft. Bei Bedarf manuell oder via LLM "
|
|
"zu pruefen.")
|
|
lines.append("")
|
|
for r in sorted(skipped, key=lambda x: x["template_type"]):
|
|
lines.append(f"- `{r['template_type']}` ({r['language']}): {r['title']}")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main() -> int:
|
|
dsn = os.environ.get("DATABASE_URL") or os.environ.get("COMPLIANCE_DATABASE_URL")
|
|
if not dsn:
|
|
print("ERROR: DATABASE_URL not set", file=sys.stderr)
|
|
return 1
|
|
conn = psycopg2.connect(dsn)
|
|
templates = fetch_templates(conn)
|
|
print(f"Auditing {len(templates)} templates...", file=sys.stderr)
|
|
|
|
results = []
|
|
for tpl in templates:
|
|
try:
|
|
results.append(audit_template(tpl))
|
|
except Exception as e:
|
|
print(f" ! {tpl['document_type']}/{tpl['language']}: {e}", file=sys.stderr)
|
|
results.append({
|
|
"template_id": tpl["id"],
|
|
"template_type": tpl["document_type"],
|
|
"language": tpl["language"],
|
|
"title": tpl["title"],
|
|
"doc_type": None,
|
|
"skipped_reason": f"error: {e}",
|
|
"l1_total": 0, "l1_passed": 0, "l1_missing": [],
|
|
})
|
|
|
|
report_md = render_markdown_report(results)
|
|
out_path = os.environ.get(
|
|
"AUDIT_OUTPUT",
|
|
"/tmp/template_audit_report.md",
|
|
)
|
|
with open(out_path, "w") as f:
|
|
f.write(report_md)
|
|
# Also dump raw JSON for further analysis
|
|
json_path = out_path.replace(".md", ".json")
|
|
with open(json_path, "w") as f:
|
|
json.dump(results, f, indent=2, default=str)
|
|
print(f"Report: {out_path}", file=sys.stderr)
|
|
print(f"Raw JSON: {json_path}", file=sys.stderr)
|
|
# Short summary to stdout
|
|
audited = [r for r in results if r.get("doc_type")]
|
|
failed = [r for r in audited if r["l1_missing"]]
|
|
print(f"\n== Audit Summary ==")
|
|
print(f"Total templates: {len(results)}")
|
|
print(f"Auditable: {len(audited)}")
|
|
print(f"With failures: {len(failed)}")
|
|
print(f"Skipped (no mapping): {len(results) - len(audited)}")
|
|
# P42: CI mode — exit non-zero when any auditable template fails L1
|
|
if "--strict" in sys.argv and failed:
|
|
print(f"\nFAIL: {len(failed)} template(s) missing mandatory fields:",
|
|
file=sys.stderr)
|
|
for r in failed:
|
|
missing = ", ".join(c["label"] for c in r["l1_missing"])
|
|
print(f" - {r['template_type']} [{r['language']}]: {missing}",
|
|
file=sys.stderr)
|
|
return 1
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|