#!/usr/bin/env python3 """ P39 — Template-Audit: prueft alle Legal-Templates aus der DB gegen unsere eigenen Pflichtangaben-Checks (doc_checks/*). Verwendet check_document_completeness — die gleiche Funktion die auch externe Sites pruefen wuerde. Reports als Markdown. Run inside the bp-compliance-backend container: docker exec bp-compliance-backend python /app/scripts/audit_template_completeness.py """ from __future__ import annotations import json import os import re import sys from collections import defaultdict from datetime import datetime, timezone from typing import Iterable import psycopg2 from psycopg2.extras import RealDictCursor # Add compliance package to path if running outside container sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from compliance.services.doc_checks.runner import check_document_completeness # noqa: E402 # template_type (DB) -> doc_type (checker) — only those for which we # have a checklist. Others fall back to LLM-only and skip. TEMPLATE_TO_DOCTYPE = { "privacy_policy": "dse", "data_protection_policy": "dse", "applicant_dsi": "dse", "employee_dsi": "dse", "social_media_dsi": "dse", "video_conference_dsi": "dse", "informationspflichten": "dse", "cookie_policy": "cookie", "agb": "agb", "widerruf": "widerruf", "dpa": "avv", "dsfa": "dsfa", "tom_documentation": "tom_annex", "loeschkonzept": "loeschkonzept", } # Demo replacements for common placeholders so the template has plausible # concrete values instead of generic {{X}} markers (which would all fail # regex-based mandatory-field checks). DEMO_PLACEHOLDERS: dict[str, str] = { "company_name": "Demo GmbH", "company_legal_name": "Demo GmbH", "company_address": "Musterstraße 1, 12345 Berlin", "company_city": "Berlin", "company_postal": "12345", "company_country": "Deutschland", "company_email": "datenschutz@demo.de", "company_phone": "+49 30 12345678", "dpo_name": "Max Mustermann", "dpo_email": "dsb@demo.de", "dpo_phone": "+49 30 87654321", "managing_director": "Erika Mustermann", "register_court": "Amtsgericht Berlin", "register_number": "HRB 123456", "vat_id": "DE123456789", "supervisory_authority": "Berliner Beauftragte für Datenschutz", "supervisory_address": "Friedrichstr. 219, 10969 Berlin", "retention_period": "10 Jahre nach Vertragsende", "third_country": "USA", "transfer_mechanism": "EU-Standardvertragsklauseln", "date": "2026-05-20", "version": "1.0", } def render_placeholders(content: str) -> str: """Replace {{key}} placeholders with demo values. Unknown placeholders are stripped to empty string so the regex checks see plausible text.""" def repl(m: re.Match) -> str: key = m.group(1).strip().lower() # Hyphens / underscores normalised key_norm = key.replace("-", "_") if key_norm in DEMO_PLACEHOLDERS: return DEMO_PLACEHOLDERS[key_norm] return f"[{key}]" # leave hint for context but don't break sentences # Match {{anything}} including dots and brackets used in conditional blocks return re.sub(r"\{\{\s*([^{}]+?)\s*\}\}", repl, content) def strip_handlebars_blocks(content: str) -> str: """Drop {{#IF X}}...{{/IF}} markers but keep inner content (audit only cares whether mandatory text appears anywhere, not which branch is active).""" # Remove block markers but keep enclosed content content = re.sub(r"\{\{#IF[^}]*\}\}", "", content) content = re.sub(r"\{\{/IF\}\}", "", content) content = re.sub(r"\{\{#UNLESS[^}]*\}\}", "", content) content = re.sub(r"\{\{/UNLESS\}\}", "", content) content = re.sub(r"\{\{else\}\}", "", content) return content def fetch_templates(conn) -> list[dict]: cur = conn.cursor(cursor_factory=RealDictCursor) cur.execute(""" SELECT id, document_type, language, title, content FROM compliance.compliance_legal_templates WHERE status = 'published' ORDER BY document_type, language """) return list(cur.fetchall()) def audit_template(tpl: dict) -> dict: """Audit a single template — returns dict with findings + summary.""" doc_type = TEMPLATE_TO_DOCTYPE.get(tpl["document_type"]) if not doc_type: return { "template_id": tpl["id"], "template_type": tpl["document_type"], "language": tpl["language"], "title": tpl["title"], "doc_type": None, "skipped_reason": "no_checklist_mapping", "l1_total": 0, "l1_passed": 0, "l1_missing": [], } raw = tpl["content"] or "" rendered = strip_handlebars_blocks(raw) rendered = render_placeholders(rendered) findings = check_document_completeness( text=rendered, doc_type=doc_type, doc_title=tpl["title"] or tpl["document_type"], doc_url=f"template://{tpl['id']}", ) # findings is a list of dicts; the first finding usually has 'all_checks' all_checks: list[dict] = [] for f in findings: if "all_checks" in f and f["all_checks"]: all_checks = f["all_checks"] break l1_checks = [c for c in all_checks if c.get("level", 1) == 1] l1_missing = [c for c in l1_checks if not c.get("passed") and not c.get("skipped")] return { "template_id": tpl["id"], "template_type": tpl["document_type"], "language": tpl["language"], "title": tpl["title"], "doc_type": doc_type, "l1_total": len(l1_checks), "l1_passed": sum(1 for c in l1_checks if c.get("passed") and not c.get("skipped")), "l1_missing": [ {"id": c.get("id"), "label": c.get("label"), "hint": c.get("hint", "")[:200]} for c in l1_missing ], "word_count": len(rendered.split()), } def render_markdown_report(results: Iterable[dict]) -> str: results = list(results) audited = [r for r in results if r.get("doc_type")] skipped = [r for r in results if not r.get("doc_type")] by_type = defaultdict(list) for r in audited: by_type[r["template_type"]].append(r) lines = [] lines.append(f"# Template-Audit (P39)") lines.append("") lines.append(f"**Datum:** {datetime.now(timezone.utc).isoformat()}") lines.append(f"**Methode:** check_document_completeness gegen jede Vorlage") lines.append("") lines.append(f"- Templates gesamt: {len(results)}") lines.append(f"- Auditierbar (mit Checklist-Mapping): {len(audited)}") lines.append(f"- Uebersprungen (kein doc_type-Mapping): {len(skipped)}") lines.append("") # Summary table by template_type lines.append("## Zusammenfassung pro Template-Typ") lines.append("") lines.append("| Template-Type | Sprache | L1-Score | Fehlende Pflichtangaben |") lines.append("|---|---|---|---|") for tpl_type in sorted(by_type): for r in by_type[tpl_type]: ratio = f"{r['l1_passed']}/{r['l1_total']}" if r["l1_total"] else "—" missing_count = len(r["l1_missing"]) lines.append( f"| `{tpl_type}` | {r['language']} | {ratio} | " f"{missing_count} fehlt" + ("e" if missing_count != 1 else "") + (f": {', '.join(c['label'] for c in r['l1_missing'])}" if r['l1_missing'] else "") + " |" ) lines.append("") # Per-template details — only those with failures failed = [r for r in audited if r["l1_missing"]] lines.append(f"## Details: {len(failed)} Templates mit fehlenden Pflichtangaben") lines.append("") for r in failed: lines.append(f"### {r['template_type']} ({r['language']}) — {r['title']}") lines.append("") lines.append(f"- Doc-Type: `{r['doc_type']}`") lines.append(f"- Wortzahl: {r['word_count']}") lines.append(f"- L1-Score: {r['l1_passed']}/{r['l1_total']}") lines.append(f"- Fehlend ({len(r['l1_missing'])}):") for c in r["l1_missing"]: lines.append(f" - **{c['label']}** (`{c['id']}`)") if c.get("hint"): lines.append(f" - Hinweis: {c['hint']}") lines.append("") # Templates without checklist if skipped: lines.append("## Templates ohne automatische Pflichtangaben-Pruefung") lines.append("") lines.append("Diese Templates haben keinen Doc-Check-Mapping — sie werden " "nicht automatisch gepruft. Bei Bedarf manuell oder via LLM " "zu pruefen.") lines.append("") for r in sorted(skipped, key=lambda x: x["template_type"]): lines.append(f"- `{r['template_type']}` ({r['language']}): {r['title']}") lines.append("") return "\n".join(lines) def main() -> int: dsn = os.environ.get("DATABASE_URL") or os.environ.get("COMPLIANCE_DATABASE_URL") if not dsn: print("ERROR: DATABASE_URL not set", file=sys.stderr) return 1 conn = psycopg2.connect(dsn) templates = fetch_templates(conn) print(f"Auditing {len(templates)} templates...", file=sys.stderr) results = [] for tpl in templates: try: results.append(audit_template(tpl)) except Exception as e: print(f" ! {tpl['document_type']}/{tpl['language']}: {e}", file=sys.stderr) results.append({ "template_id": tpl["id"], "template_type": tpl["document_type"], "language": tpl["language"], "title": tpl["title"], "doc_type": None, "skipped_reason": f"error: {e}", "l1_total": 0, "l1_passed": 0, "l1_missing": [], }) report_md = render_markdown_report(results) out_path = os.environ.get( "AUDIT_OUTPUT", "/tmp/template_audit_report.md", ) with open(out_path, "w") as f: f.write(report_md) # Also dump raw JSON for further analysis json_path = out_path.replace(".md", ".json") with open(json_path, "w") as f: json.dump(results, f, indent=2, default=str) print(f"Report: {out_path}", file=sys.stderr) print(f"Raw JSON: {json_path}", file=sys.stderr) # Short summary to stdout audited = [r for r in results if r.get("doc_type")] failed = [r for r in audited if r["l1_missing"]] print(f"\n== Audit Summary ==") print(f"Total templates: {len(results)}") print(f"Auditable: {len(audited)}") print(f"With failures: {len(failed)}") print(f"Skipped (no mapping): {len(results) - len(audited)}") # P42: CI mode — exit non-zero when any auditable template fails L1 if "--strict" in sys.argv and failed: print(f"\nFAIL: {len(failed)} template(s) missing mandatory fields:", file=sys.stderr) for r in failed: missing = ", ".join(c["label"] for c in r["l1_missing"]) print(f" - {r['template_type']} [{r['language']}]: {missing}", file=sys.stderr) return 1 return 0 if __name__ == "__main__": sys.exit(main())