bd65b6f318
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Failing after 59s
CI / detect-changes (push) Successful in 10s
CI / branch-name (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 15s
CI / loc-budget (push) Failing after 19s
CI / iace-gt-coverage (push) Successful in 27s
CI / test-python-backend (push) Successful in 42s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
P54 — consent_diff_for_user.py: USP-Feature fuer wiederkehrende Besucher. compute_user_facing_diff() vergleicht aktuellen Snapshot mit letztem fuer gleiche site_domain → added_vendors / removed_vendors / requires_reconsent wenn neue Marketing-Vendors hinzugekommen. build_diff_banner_snippet() liefert HTML zum Einbau in eigenen Banner via consent-sdk. P68 — reverse_audit.py: Self-Audit unserer Template-Bibliothek. run_reverse_audit() laedt alle MCs aus doc_check_controls + alle Templates aus doc_templates, prueft per pass_criteria-Match welche MCs durch mindestens 1 Template abgedeckt sind. Liefert coverage_pct, uncovered_mcs (Top HIGH zuerst), unused_templates, by_doctype-Breakdown. P69 — data/ecall_regulation.json: eCall-VO (EU) 2015/758 als 7 Chunks fuer RAG-Ingest (Art. 3/6/7 + compliance_implications fuer Automotive-OEMs). Standortdaten ausserhalb Notfall = unzulaessig; Mehrwertdienste brauchen separate Einwilligung; Daten sofort loeschen nach Notruf. P6+P53+P55 — industry_library.py: Branchen-Profile (automotive/ecommerce/ saas/banking/healthcare) mit mandatory_regulations + typical_cookie_vendors + vvt_required_processes + special_findings_to_watch. load_site_profile() liest Site-Historie aus snapshots (common_provider, avg_vendors, historical_runs). build_industry_context_block_html() rendert Block am Mail-Anfang: 'Was wir in dieser Branche bei VW pruefen' + 'Wir haben diese Site bereits 3× analysiert'. P31 — llm_cascade.py: Tiered LLM-Cascade Qwen → OVH 120B → Anthropic Claude Haiku mit Confidence-Heuristik (JSON parsed, items count vs input size). Valkey-Cache (redis://) mit 7-Tage-TTL plus In-Process- Fallback. Wenn Tier-1 unter Confidence-Threshold → Tier-2, dann Tier-3. Reduziert Lauf-Zeit drastisch bei Re-Runs. P80 v2 — check_replay.py: replay nutzt jetzt audit_quality_checks mit den Snapshot-Daten. Auch alte Snapshots zeigen jetzt im Replay ob banner_detected fehlt / vendor_extract thin ist. Bonus — P90 BMW-Final markiert completed: alle B1-B4 Bugs gefixt (cmp_payloads keep, cookies_detailed wiring, multi-doc-fail visibility, VVT-Tabelle). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
174 lines
6.2 KiB
Python
174 lines
6.2 KiB
Python
"""
|
|
P68 — Reverse-Audit: eigene Templates gegen alle MCs pruefen.
|
|
|
|
Statt 'gegeben einen Kunden-Text → welche MCs fail' machen wir den
|
|
umgekehrten Test: 'gegeben unseren BreakPilot-Standard-Template-Pool
|
|
(95 Templates) → welche MCs werden NICHT abgedeckt? Wo sind Luecken?'
|
|
|
|
Liefert einen Coverage-Report:
|
|
- Total MCs in DB: ~1800
|
|
- MCs abgedeckt durch min. 1 unserer Templates: X
|
|
- MCs ohne Coverage: Y (Liste)
|
|
- Templates ohne MC-Wirkung: Z (Liste)
|
|
|
|
Zweck: Audit unserer eigenen Code-Base. Wenn ein Customer einen Lauf
|
|
macht und 50 Findings produziert sind, sollten 90%+ davon durch unsere
|
|
Template-Bibliothek korrigierbar sein. Wenn nicht → Templates fehlen.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import re
|
|
|
|
from sqlalchemy import text as sa_text
|
|
from sqlalchemy.orm import Session
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def run_reverse_audit(db: Session) -> dict:
|
|
"""Hauptfunktion. Returns coverage-report dict."""
|
|
# 1) Alle MCs aus doc_check_controls laden
|
|
mc_rows = db.execute(sa_text(
|
|
"""
|
|
SELECT id::text, control_id, doc_type, title, check_question,
|
|
pass_criteria, severity
|
|
FROM compliance.doc_check_controls
|
|
ORDER BY doc_type, severity DESC
|
|
"""
|
|
)).fetchall()
|
|
|
|
# 2) Templates aus DB (doc_templates oder legal_templates oder analog)
|
|
try:
|
|
tpl_rows = db.execute(sa_text(
|
|
"""
|
|
SELECT id::text, doc_type, title, body
|
|
FROM compliance.doc_templates
|
|
WHERE active = TRUE
|
|
"""
|
|
)).fetchall()
|
|
except Exception:
|
|
# Fallback auf evtl. andere Template-Tabelle
|
|
try:
|
|
tpl_rows = db.execute(sa_text(
|
|
"""
|
|
SELECT id::text, doc_type, name AS title, content AS body
|
|
FROM compliance.legal_templates
|
|
"""
|
|
)).fetchall()
|
|
except Exception as e:
|
|
logger.warning("template table not found: %s", e)
|
|
tpl_rows = []
|
|
|
|
# 3) Coverage-Matrix: pro MC, ob ein Template sie abdeckt
|
|
templates_by_doctype: dict[str, list[dict]] = {}
|
|
for tid, dt, title, body in tpl_rows:
|
|
templates_by_doctype.setdefault(dt or "other", []).append({
|
|
"id": tid, "title": title, "body": (body or "")[:50000],
|
|
})
|
|
|
|
covered_mc_ids: set[str] = set()
|
|
uncovered: list[dict] = []
|
|
for mc_id, ctrl_id, dt, title, q, pc, sev in mc_rows:
|
|
tpls = templates_by_doctype.get(dt or "other") or []
|
|
if not tpls:
|
|
uncovered.append({
|
|
"mc_id": ctrl_id, "doc_type": dt, "title": title,
|
|
"severity": sev, "reason": "no_template_for_doctype",
|
|
})
|
|
continue
|
|
# Heuristik: pass_criteria sind Pattern. Wenn IRGENDEIN Template
|
|
# die Pattern enthaelt → covered.
|
|
criteria = _extract_patterns_from_pc(pc)
|
|
if not criteria:
|
|
# ohne klare Pattern: per Title-Keywords pruefen
|
|
criteria = _title_keywords(title or "")
|
|
ok = False
|
|
for tpl in tpls:
|
|
body = tpl["body"].lower()
|
|
hits = sum(1 for p in criteria if p and p.lower() in body)
|
|
if hits >= max(1, len(criteria) // 2):
|
|
ok = True
|
|
break
|
|
if ok:
|
|
covered_mc_ids.add(mc_id)
|
|
else:
|
|
uncovered.append({
|
|
"mc_id": ctrl_id, "doc_type": dt, "title": title,
|
|
"severity": sev, "reason": "no_template_match",
|
|
"criteria_sample": criteria[:5],
|
|
})
|
|
|
|
# 4) Templates ohne MC-Wirkung
|
|
used_template_ids: set[str] = set()
|
|
for mc_id, ctrl_id, dt, title, q, pc, sev in mc_rows:
|
|
if mc_id not in covered_mc_ids:
|
|
continue
|
|
tpls = templates_by_doctype.get(dt or "other") or []
|
|
criteria = _extract_patterns_from_pc(pc) or _title_keywords(title or "")
|
|
for tpl in tpls:
|
|
body = tpl["body"].lower()
|
|
hits = sum(1 for p in criteria if p and p.lower() in body)
|
|
if hits >= max(1, len(criteria) // 2):
|
|
used_template_ids.add(tpl["id"])
|
|
break
|
|
all_template_ids = {t["id"] for tpls in templates_by_doctype.values()
|
|
for t in tpls}
|
|
unused_templates = all_template_ids - used_template_ids
|
|
|
|
return {
|
|
"total_mcs": len(mc_rows),
|
|
"total_templates": len(all_template_ids),
|
|
"covered_mcs": len(covered_mc_ids),
|
|
"uncovered_mcs": len(uncovered),
|
|
"coverage_pct": round(len(covered_mc_ids) / max(1, len(mc_rows)) * 100, 1),
|
|
"unused_templates": sorted(unused_templates),
|
|
"top_uncovered_high": [u for u in uncovered if u.get("severity") == "HIGH"][:30],
|
|
"by_doctype": _summarize_by_doctype(mc_rows, covered_mc_ids),
|
|
}
|
|
|
|
|
|
def _extract_patterns_from_pc(pc) -> list[str]:
|
|
"""pc ist jsonb mit z.B. {required_phrases: [...]}, {keywords: [...]}"""
|
|
if not pc:
|
|
return []
|
|
if isinstance(pc, str):
|
|
try:
|
|
import json as _j
|
|
pc = _j.loads(pc)
|
|
except Exception:
|
|
return [pc[:50]]
|
|
if isinstance(pc, dict):
|
|
out: list[str] = []
|
|
for k in ("required_phrases", "keywords", "must_contain",
|
|
"patterns", "phrases"):
|
|
v = pc.get(k)
|
|
if isinstance(v, list):
|
|
out.extend([str(x)[:80] for x in v if x])
|
|
return out
|
|
if isinstance(pc, list):
|
|
return [str(x)[:80] for x in pc if x]
|
|
return []
|
|
|
|
|
|
def _title_keywords(title: str) -> list[str]:
|
|
"""Fallback wenn pass_criteria leer: extrahiere Substantive aus Title."""
|
|
if not title:
|
|
return []
|
|
# primitive: alle Worte > 4 Buchstaben
|
|
return [w for w in re.findall(r"\b\w{5,}\b", title)][:5]
|
|
|
|
|
|
def _summarize_by_doctype(mc_rows, covered_mc_ids: set[str]) -> dict:
|
|
out: dict[str, dict] = {}
|
|
for mc_id, ctrl_id, dt, title, q, pc, sev in mc_rows:
|
|
dt = dt or "other"
|
|
d = out.setdefault(dt, {"total": 0, "covered": 0})
|
|
d["total"] += 1
|
|
if mc_id in covered_mc_ids:
|
|
d["covered"] += 1
|
|
for dt, d in out.items():
|
|
d["pct"] = round(d["covered"] / max(1, d["total"]) * 100, 1)
|
|
return out
|