Compare commits

..

4 Commits

Author SHA1 Message Date
Benjamin Admin ce6b4c58e3 feat(agent-ui): add Architektur tab explaining the doc-check pipeline
Mirror the CE module's /sdk/iace/.../architektur tab for /sdk/agent: a
hand-authored schema (data-flow lanes, step-by-step pipeline accordion,
module-engine cards, Pruefer-Matrix) explaining orchestrator phases A-F,
the parallel specialist agents (Impressum/AGB/DSE), the 4-layer DSE engine,
and the verification/decision-method meta-model. Adds a page-level
Check | Architektur tab toggle (the page was flat).

Static content (the Python doc-check has no architecture endpoint, unlike
the Go IACE module); can be data-fed later.

NOTE: not yet lint/type/browser-verified -- the worktree has no node_modules.
Needs a visual check + next lint / tsc in an env with the toolchain.

dev-only, no deploy.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-21 11:25:28 +02:00
Benjamin Admin f6d018234b feat(dse): recover v3 DSE engine from container + wire into live check path
The calibrated DSE engine (4-layer: regex-boost / keyword / BGE-M3 embedding
recall @0.65 / semantic-validator) existed ONLY in the running macmini
container (docker cp'd, never committed) — at risk of loss on any container
rebuild. This recovers it into git and wires it into the live check path.

- Recover dse/{agent,v3_engine,_embedding_recall,_classification_gate,
  regex_boost,mcs,deep_check}.py. DSEAgent (v3, BaseSpecialistAgent) replaces
  the keyword-only stub: delegates MC-loading to the main engine
  (rag_document_checker._load_controls), deterministic cached embedding recall
  (reachability-gated), semantic-validator LLM layer honoring skip_llm,
  third-country -> HIGH on documented transfer.
- Wire "dse" into _agent_outputs._TOPIC_AGENTS -> live check emits a validated
  DSE tab (was snapshot/legacy-only).
- Tests rewritten for v3 (DB/embedding/LLM stubbed offline): regex-boost
  detection, embedding-recall reachability guard, result->Finding conversion,
  third-country HIGH; topic-wiring asserts "dse".
- deep_check.py recovered for preservation (alternate LLM-judge path, unwired).

Runtime data deps for full live behavior (note for prod): doc_check_controls
in DB + /data/mc_classification.db embedding sidecar + embedding-service; all
degrade gracefully (keyword layer carries) if absent.

dev-only, no deploy.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-21 11:15:52 +02:00
Benjamin Admin 32e45f0797 feat(agb): wire validated routed AGB engine into live check path
Consolidate the AGB C-lean engine (71% FP -> ~0, validated vs 7-company
Opus GT) onto the canonical checker library and into the live check path.

- AGBAgent.evaluate now runs routed C-lean: keyword (L1/L2) -> business-
  model gate -> per-item decision_method routing (embedding/reference/llm
  via services/checkers/) -> severity re-tiering (LOW -> recommendation),
  honoring context.skip_llm.
- New agb/_pipeline.py orchestrates the routing; agent.py stays thin.
- Remove the 3 AGB-local checker duplicates (_reference_check,
  _embedding_rescue, _llm_judge); services/checkers/ is now canonical.
- Wire "agb" into _agent_outputs._TOPIC_AGENTS so the live check emits a
  validated AGB tab (was snapshot-only).
- Run topic agents concurrently (asyncio.gather) + emit each tab via SSE
  as it finishes -> progressive results, no wait on the slowest agent.
- Tests: checker units (mocked), routed agent (gate/rescue/re-tier),
  topic wiring; existing AGB tests made offline-safe.

dev-only, no deploy.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-21 10:40:08 +02:00
Benjamin Admin 9d79cf1576 docs+feat(platform): Pruefer-Matrix-Foundation einfrieren (Evidenz, Mapping, Checker-Library, AGB-Kalibrierung)
Know-how-Freeze der Website-Compliance-Runde (DSE/Cookie/Impressum/AGB). docs: platform_evidence_v1 (Evidenz-/Qualitaetsnachweis, echte Zahlen), nutzungsbedingungen_mapping (neues Modul = Mapping, empirisch belegt), platform_checker_matrix (Meta-Modell verification_method x decision_method), verification_method, platform_validation_v1. code: checkers/ (reusable Pruefer-Library base+reference+embedding+llm, im Container validiert), agb/ (decision_method-Routing + Checker-Prototypen, 71% FP -> ~0 validiert). Dev-only, kein Prod-Push; Benchmark-GTs/Korpora im internen Archiv (data-retention).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-21 09:23:21 +02:00
28 changed files with 2589 additions and 81 deletions
@@ -0,0 +1,302 @@
'use client'
// Erklärendes Architekturschema des Compliance-Check-Tools — Muster aus dem
// CE-Modul (/sdk/iace/.../architektur) übernommen: hand-kurierte Boxen/Pfeile +
// Schritt-Akkordeon. Inhalt spiegelt den Code-Pfad (api/agent_check/_orchestrator
// + services/specialist_agents). Bewusst statisch (der Doc-Check ist Python, hat
// keinen Architektur-Endpoint wie das Go-IACE-Modul) — bei Bedarf später aus einem
// Backend-Handler speisbar.
import { useState, type ReactNode } from 'react'
function Box({ title, sub, accent }: { title: string; sub?: string; accent?: 'purple' | 'amber' | 'green' | 'gray' }) {
const c =
accent === 'purple'
? 'border-purple-300 bg-purple-50/60 dark:border-purple-700 dark:bg-purple-900/20'
: accent === 'amber'
? 'border-amber-300 bg-amber-50/60 dark:border-amber-700 dark:bg-amber-900/20'
: accent === 'green'
? 'border-green-300 bg-green-50/60 dark:border-green-700 dark:bg-green-900/20'
: 'border-gray-200 bg-white dark:border-gray-700 dark:bg-gray-800'
return (
<div className={`rounded-lg border ${c} px-2.5 py-1.5`}>
<div className="text-[11px] font-medium text-gray-800 dark:text-gray-200 leading-tight">{title}</div>
{sub && <div className="text-[10px] text-gray-500 leading-tight mt-0.5">{sub}</div>}
</div>
)
}
function Lane({ label, children }: { label: string; children: ReactNode }) {
return (
<div className="flex-1 min-w-[150px] space-y-2">
<div className="text-[10px] font-semibold uppercase tracking-wide text-gray-400 text-center">{label}</div>
<div className="space-y-1.5">{children}</div>
</div>
)
}
function Arrow() {
return (
<div className="flex items-center justify-center text-gray-300 dark:text-gray-600 shrink-0 px-0.5">
<span className="hidden lg:block text-lg"></span>
<span className="lg:hidden text-sm"></span>
</div>
)
}
type Stage = {
id: string
title: string
summary: string
input: string
logic: string
source: string
example: string
}
// Spiegelt run_compliance_check (Phasen AF) + die Spezialagenten-Schicht.
const STAGES: Stage[] = [
{
id: 'a',
title: 'Phase A — Auflösen & Crawl',
summary: 'URLs + hochgeladene Dokumente einsammeln, fehlende Pflichtseiten automatisch finden.',
input: 'Start-URL, Dokument-Uploads, 8 Wizard-Felder (scan_context)',
logic: 'Discovery (Sitemap/Heuristik) + Fetch je Seite, Text-Extraktion pro Doc-Typ',
source: 'consent-tester /dsi-discovery, Playwright',
example: 'Findet /impressum, /datenschutz, /agb ohne manuelle Eingabe',
},
{
id: 'b',
title: 'Phase B — Profil & Dokument-Checks',
summary: 'Geschäftsprofil erkennen, jedes Dokument gegen seine Controls prüfen.',
input: 'Doc-Texte je Typ + Business-Scope',
logic: 'Regex-Runner + MC-Keyword + BGE-M3-Embedding + LLM-Verify (nur unscharf)',
source: 'doc_check_controls (DB), mc_classification.db (Embeddings)',
example: 'DSE: 267 Text-MCs, Keyword + semantischer Recall',
},
{
id: 'agents',
title: 'Spezialagenten (nebenläufig)',
summary: 'Pro Dokumenttyp ein typisierter Agent → eigener Ergebnis-Tab, gefüllt per SSE.',
input: 'Doc-Text, Scope, scan_context',
logic: 'Impressum + AGB + DSE laufen parallel (asyncio.gather), je ein AgentOutput',
source: 'api/agent_check/_agent_outputs._TOPIC_AGENTS',
example: 'AGB-Tab + DSE-Tab erscheinen, sobald ihr Agent fertig ist',
},
{
id: 'c',
title: 'Phase C — Cookie-Banner',
summary: 'Consent-Banner + gesetzte Cookies vor/nach Einwilligung live prüfen.',
input: 'Live-Seite im Browser',
logic: 'Consent-Tester-Scan: Banner, Vendors, Enforcement, Browser-Matrix',
source: 'consent-tester /scan',
example: 'Cookie vor Einwilligung gesetzt → Verstoß-Kandidat',
},
{
id: 'd',
title: 'Phase D — Vendors & Plausibilität',
summary: 'Dritt-Dienste extrahieren + Findings auf Plausibilität prüfen.',
input: 'Banner-/Seiten-Daten, Findings',
logic: 'Vendor-Extraktion (+OCR-Fallback), Plausibilitäts-Check je FAIL',
source: 'Cookie-/Vendor-Kataloge, LLM-Kaskade',
example: 'Analytics ohne Rechtsgrundlage → bestätigtes Finding',
},
{
id: 'reconcile',
title: 'Cross-Finding-Abgleich',
summary: 'Findings über Dokumente hinweg abgleichen — Doppel & Scheinverstöße auflösen.',
input: 'Alle Modul-Findings',
logic: 'Deckt ein anderes Dokument die Pflicht ab, wird das Cross-Finding unterdrückt',
source: 'cross_doc_reconcile (B-Wirings)',
example: '§36 VSBG im Impressum statt DSE → kein Doppel-Finding',
},
{
id: 'f',
title: 'Phase E/F — Bericht & Snapshot',
summary: 'Ergebnis persistieren, Snapshot für die Historie speichern.',
input: 'Konsolidiertes Ergebnis',
logic: 'Mail-Render + DB-Persist + Snapshot (Tab-Ansicht ohne Re-Crawl)',
source: 'compliance_check_snapshots',
example: 'Historie erneut öffnen, ohne die Seite neu zu crawlen',
},
]
type ModuleEngine = { name: string; mechanism: string }
const MODULES: ModuleEngine[] = [
{ name: 'Impressum', mechanism: 'Scope-Gate + Feld-Matcher (§5 DDG / §18 MStV)' },
{ name: 'AGB', mechanism: 'decision_method-Routing: Keyword → Geschäftsmodell-Gate → Embedding/Reference/LLM' },
{ name: 'DSE', mechanism: '4-Layer: Regex-Boost → Keyword → BGE-M3-Recall (0.65) → Semantic-Validator' },
{ name: 'Cookie-Banner', mechanism: 'Consent-Tester: Banner, Vendors, Enforcement, Browser-Matrix' },
]
type Pruefer = { method: string; mechanism: string; deterministic: string; example: string }
// Meta-Modell: jede Pflicht → ein Prüfertyp (decision_method). Wenige
// wiederverwendbare Prüfer statt Logik pro Control.
const PRUEFER: Pruefer[] = [
{ method: 'REGEX', mechanism: 'Kuratierte Muster / Keyword', deterministic: 'ja', example: 'Pflicht-Stichwort im Text' },
{ method: 'EMBEDDING', mechanism: 'BGE-M3 Kosinus ≥ Schwelle', deterministic: 'ja (feste Funktion)', example: '„Recht auf Berichtigung" ≈ Umschreibung' },
{ method: 'REFERENCE', mechanism: 'Link-/Verweis-Auflösung', deterministic: 'ja', example: 'Verweis auf die Datenschutzerklärung' },
{ method: 'LLM', mechanism: 'Kaskade Qwen→OVH→Claude, nur unscharfe Fälle', deterministic: 'nein (eskaliert)', example: 'Speicherdauer inhaltlich erfüllt?' },
{ method: 'BEHAVIOR', mechanism: 'Playwright: Live-Verhalten', deterministic: 'ja', example: 'Cookies vor Einwilligung gesetzt?' },
{ method: 'SCANNER', mechanism: 'Repo-/Netzwerk-/Prozess-Scan', deterministic: 'ja', example: 'Geplant: technische Nachweise' },
]
function Field({ label, value, mono }: { label: string; value: string; mono?: boolean }) {
return (
<div>
<dt className="text-[10px] uppercase tracking-wide text-gray-400">{label}</dt>
<dd className={`text-gray-600 dark:text-gray-300 ${mono ? 'font-mono text-[11px]' : ''}`}>{value}</dd>
</div>
)
}
function StageRow({ stage, last, open, onToggle }: { stage: Stage; last: boolean; open: boolean; onToggle: () => void }) {
return (
<div>
<button
onClick={onToggle}
className={`w-full text-left rounded-lg border p-3 transition-colors ${
open
? 'border-purple-300 bg-purple-50/60 dark:border-purple-700 dark:bg-purple-900/20'
: 'border-gray-200 dark:border-gray-700 bg-white dark:bg-gray-800 hover:bg-gray-50 dark:hover:bg-gray-700/50'
}`}
>
<div className="flex items-start justify-between gap-3">
<div>
<div className="text-sm font-semibold text-gray-800 dark:text-gray-200">{stage.title}</div>
<div className="text-xs text-gray-500 mt-0.5">{stage.summary}</div>
</div>
<span className="text-gray-400 text-xs shrink-0">{open ? '▲' : '▼'}</span>
</div>
{open && (
<dl className="mt-3 grid grid-cols-1 md:grid-cols-2 gap-x-6 gap-y-2 text-xs">
<Field label="Input" value={stage.input} />
<Field label="Logik" value={stage.logic} />
<Field label="Datenquelle" value={stage.source} mono />
<Field label="Beispiel" value={stage.example} />
</dl>
)}
</button>
{!last && <div className="flex justify-center text-gray-300 dark:text-gray-600 text-xs leading-none py-0.5"></div>}
</div>
)
}
export function ArchitekturView() {
const [open, setOpen] = useState<string | null>('b')
return (
<div className="space-y-8">
<div>
<h2 className="text-xl font-bold text-gray-900 dark:text-gray-100">Architektur &amp; Datenfluss</h2>
<p className="text-sm text-gray-500 dark:text-gray-400 max-w-3xl mt-1">
Nachvollziehbar: <strong>woher jedes Finding stammt</strong> und <strong>wie es geprüft wird</strong>.
Die Engine ist überwiegend <strong>deterministisch</strong> (Regex + Embedding); ein LLM entscheidet nur
die unscharfen Fälle. Ergebnisse erscheinen pro Modul progressiv und werden am Ende per
Cross-Finding-Abgleich bereinigt.
</p>
</div>
<section className="space-y-2">
<h3 className="text-sm font-semibold text-gray-700 dark:text-gray-300">Datenfluss (Überblick)</h3>
<div className="rounded-xl border border-gray-200 dark:border-gray-700 bg-gray-50/50 dark:bg-gray-900/20 p-3 overflow-x-auto">
<div className="flex flex-col lg:flex-row gap-1.5 lg:items-stretch min-w-[280px]">
<Lane label="Eingabe">
<Box title="Website + Dokumente" sub="Impressum · DSE · AGB · Cookies" accent="purple" />
<Box title="Wizard-Kontext" sub="8 Felder: Shop, Drittland, Beruf…" accent="purple" />
</Lane>
<Arrow />
<Lane label="Crawl + Text">
<Box title="Discovery + Fetch" sub="consent-tester, Playwright" />
<Box title="Doc-Text je Typ" />
</Lane>
<Arrow />
<Lane label="Engine (deterministisch)">
<div className="rounded-lg border border-gray-200 dark:border-gray-700 bg-white dark:bg-gray-800 p-1.5 space-y-1">
{STAGES.map((s) => (
<div key={s.id} className="text-[10px] text-gray-600 dark:text-gray-300 leading-tight">
{s.title}
</div>
))}
</div>
</Lane>
<Arrow />
<Lane label="Ausgaben">
<Box title="Findings je Modul-Tab" sub="Impressum/AGB/DSE/Cookie" accent="green" />
<Box title="Severity + Maßnahme" accent="green" />
<Box title="Snapshot + Bericht" sub="ohne Re-Crawl" accent="green" />
</Lane>
</div>
<p className="text-[10px] text-gray-400 mt-2">
Linksrechts reproduzierbar. Embedding ist semantisch UND deterministisch (feste Funktion: gleicher
Text gleicher Vektor). Das LLM läuft nur für unscharfe Fälle und eskaliert mit Selbstkonfidenz.
</p>
</div>
</section>
<section className="space-y-2">
<h3 className="text-sm font-semibold text-gray-700 dark:text-gray-300">Pipeline (Schritt für Schritt)</h3>
<div className="space-y-1">
{STAGES.map((s, i) => (
<StageRow
key={s.id}
stage={s}
last={i === STAGES.length - 1}
open={open === s.id}
onToggle={() => setOpen(open === s.id ? null : s.id)}
/>
))}
</div>
</section>
<section className="space-y-3">
<h3 className="text-sm font-semibold text-gray-700 dark:text-gray-300">Modul-Engines (live)</h3>
<div className="grid grid-cols-1 sm:grid-cols-2 gap-3">
{MODULES.map((m) => (
<div key={m.name} className="rounded-lg border border-gray-200 dark:border-gray-700 bg-white dark:bg-gray-800 p-3">
<div className="flex items-baseline justify-between gap-2">
<span className="text-sm font-medium text-gray-800 dark:text-gray-200">{m.name}</span>
<span className="inline-block rounded px-1.5 py-0.5 text-[10px] font-medium bg-green-100 text-green-700 dark:bg-green-900/40 dark:text-green-300">
live
</span>
</div>
<p className="text-xs text-gray-500 mt-1">{m.mechanism}</p>
</div>
))}
</div>
</section>
<section className="space-y-3">
<h3 className="text-sm font-semibold text-gray-700 dark:text-gray-300">Prüfer-Matrix (Meta-Modell)</h3>
<p className="text-xs text-gray-500 max-w-3xl">
Jede Pflicht wird einem <strong>Prüfertyp</strong> zugeordnet so braucht es nicht pro Control eigene
Logik, sondern wenige wiederverwendbare Prüfer.
</p>
<div className="overflow-x-auto">
<table className="w-full text-xs">
<thead>
<tr className="text-gray-500 border-b border-gray-200 dark:border-gray-700 text-left">
<th className="py-1.5 pr-3">Prüfer</th>
<th className="py-1.5 pr-3">Mechanismus</th>
<th className="py-1.5 pr-3">Deterministisch</th>
<th className="py-1.5">Beispiel</th>
</tr>
</thead>
<tbody>
{PRUEFER.map((p) => (
<tr key={p.method} className="border-b border-gray-100 dark:border-gray-700/50 align-top">
<td className="py-1.5 pr-3">
<code className="text-[11px] bg-gray-100 dark:bg-gray-700 rounded px-1">{p.method}</code>
</td>
<td className="py-1.5 pr-3 text-gray-600 dark:text-gray-300">{p.mechanism}</td>
<td className="py-1.5 pr-3 text-gray-500">{p.deterministic}</td>
<td className="py-1.5 text-gray-500">{p.example}</td>
</tr>
))}
</tbody>
</table>
</div>
</section>
</div>
)
}
+26 -4
View File
@@ -4,10 +4,12 @@ import React, { useState } from 'react'
import { ComplianceCheckTab } from './_components/ComplianceCheckTab'
import { ComplianceFAQ } from './_components/ComplianceFAQ'
import { SnapshotHistoryList } from './_components/SnapshotHistoryList'
import { ArchitekturView } from './_components/ArchitekturView'
export default function AgentPage() {
// Nach einem abgeschlossenen Check die Historie unten neu laden.
const [historyKey, setHistoryKey] = useState(0)
const [tab, setTab] = useState<'check' | 'architektur'>('check')
return (
<div className="space-y-6 max-w-4xl">
@@ -16,11 +18,31 @@ export default function AgentPage() {
<p className="text-gray-500 mt-1">Webseiten + Dokumente auf DSGVO-Konformität prüfen.</p>
</div>
<ComplianceCheckTab onComplete={() => setHistoryKey(k => k + 1)} />
<div className="flex gap-1 border-b border-gray-200 dark:border-gray-700">
{([['check', 'Check'], ['architektur', 'Architektur']] as const).map(([id, label]) => (
<button
key={id}
onClick={() => setTab(id)}
className={`px-3 py-2 text-sm font-medium -mb-px border-b-2 transition-colors ${
tab === id
? 'border-purple-500 text-purple-600 dark:text-purple-400'
: 'border-transparent text-gray-500 hover:text-gray-700 dark:hover:text-gray-300'
}`}
>
{label}
</button>
))}
</div>
<SnapshotHistoryList refreshKey={historyKey} />
<ComplianceFAQ />
{tab === 'check' ? (
<>
<ComplianceCheckTab onComplete={() => setHistoryKey(k => k + 1)} />
<SnapshotHistoryList refreshKey={historyKey} />
<ComplianceFAQ />
</>
) : (
<ArchitekturView />
)}
</div>
)
}
@@ -13,6 +13,7 @@ the map). Once the tabs are the source of truth, B18's v1 path retires.
from __future__ import annotations
import asyncio
import logging
from compliance.services.specialist_agents import REGISTRY, AgentInput
@@ -27,6 +28,8 @@ logger = logging.getLogger(__name__)
# topic key (matches state["doc_texts"]) -> registered agent_id
_TOPIC_AGENTS: dict[str, str] = {
"impressum": "impressum",
"agb": "agb", # v2: AGBAgent mit decision_method-Routing (71% FP -> ~0)
"dse": "dse", # v3: 4-Layer (Regex-Boost/Keyword/BGE-M3-Recall/Semantic)
}
_MIN_TEXT = 100
@@ -112,14 +115,17 @@ async def run_agent_outputs(state: dict) -> None:
)
outputs: dict[str, dict] = state.get("agent_outputs") or {}
for topic, agent_id in _TOPIC_AGENTS.items():
async def _run_one(topic: str, agent_id: str):
"""Einen Topic-Agent laufen lassen + sein Tab-Event sofort emittieren
(Zwischenbefund). Fängt eigene Fehler → ein Agent reißt den Run nicht ab."""
text = (doc_texts.get(topic) or "").strip()
if len(text) < _MIN_TEXT:
continue
return None
agent = REGISTRY.get(agent_id)
if agent is None:
logger.warning("agent_outputs: agent '%s' not registered", agent_id)
continue
return None
try:
out = await agent.evaluate(AgentInput(
doc_type=topic,
@@ -128,15 +134,25 @@ async def run_agent_outputs(state: dict) -> None:
company_name=company_name,
origin_domain=origin_domain,
))
outputs[topic] = out.model_dump(mode="json")
emit(check_id, {"type": "topic", "topic": topic,
"output": outputs[topic]})
dump = out.model_dump(mode="json")
emit(check_id, {"type": "topic", "topic": topic, "output": dump})
logger.info(
"agent_outputs[%s]: %d findings, confidence %.2f",
topic, len(out.findings), out.confidence,
)
return topic, dump
except Exception as e: # noqa: BLE001 — best-effort, never break the run
logger.warning("agent_outputs[%s] failed: %s", topic, e)
return None
# Topic-Agenten laufen NEBENLÄUFIG (ihre Embedding-/LLM-Waits überlappen) und
# füllen ihren Tab via SSE, sobald sie fertig sind — kein Warten aufs Schlusslicht.
results = await asyncio.gather(
*(_run_one(topic, agent_id) for topic, agent_id in _TOPIC_AGENTS.items())
)
for r in results:
if r:
outputs[r[0]] = r[1]
if outputs:
state["agent_outputs"] = outputs
@@ -0,0 +1,82 @@
"""Pruefer-Library — gemeinsames Interface. Siehe docs platform_checker_matrix.md.
Ein Checker prueft EINEN Control gegen EIN Dokument und liefert: vorhanden / fehlt
/ unklar (+ Evidence). Module (DSE/Impressum/AGB/...) liefern nur Control-Metadaten
ueber `ControlSpec` (verification_method + decision_method + checker-spezifische
Config); die Engine routet method-agnostisch zum passenden Checker.
Ziel der Plattform: 14k Controls -> 7 Pruefertypen -> wenige Pruefer. Ein neues
Modul wird damit ein Klassifizierungs-, kein Forschungsproblem.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Optional, Protocol, runtime_checkable
class VerificationMethod:
"""Achse 1 — WELCHER Pruefer-Typ (Kategorie)."""
FIELD = "FIELD"
REFERENCE = "REFERENCE"
BEHAVIOR = "BEHAVIOR"
PRESENTATION = "PRESENTATION"
CONTENT = "CONTENT"
PROCESS = "PROCESS"
TECHNICAL = "TECHNICAL"
CONTRACTUAL = "CONTRACTUAL"
class DecisionMethod:
"""Achse 2 — WIE entschieden wird (konkreter Mechanismus)."""
REGEX = "REGEX"
EMBEDDING = "EMBEDDING"
LLM = "LLM"
LINK_RESOLVER = "LINK_RESOLVER"
PLAYWRIGHT = "PLAYWRIGHT"
AUDIT = "AUDIT"
SCANNER = "SCANNER"
@dataclass
class ControlSpec:
"""Routing-Metadaten + checker-spezifische Config eines Controls. Module fuellen
nur die fuer ihren decision_method relevanten Felder."""
control_id: str
verification_method: str
decision_method: str
label: str = ""
severity: str = "MEDIUM"
patterns: list[str] = field(default_factory=list) # FIELD/REGEX, REFERENCE
paraphrases: list[str] = field(default_factory=list) # CONTENT (EMBEDDING/LLM)
embed_threshold: Optional[float] = None # EMBEDDING (per-Control)
topic_regex: str = "" # LLM: Section-Retrieval
question: str = "" # LLM: Pruef-Frage
extra: dict[str, Any] = field(default_factory=dict)
@dataclass
class DocContext:
"""Das zu pruefende Artefakt. `text` = Volltext; `url`/`rendered` fuer
PRESENTATION/BEHAVIOR (Playwright) — spaeter."""
text: str = ""
url: str = ""
rendered: Any = None
@dataclass
class CheckResult:
present: Optional[bool] # True=erfuellt, False=fehlt, None=unklar (fail-safe)
evidence: str = ""
confidence: float = 0.0
source: str = "" # welcher Pruefer/Tier geantwortet hat
detail: dict[str, Any] = field(default_factory=dict)
@runtime_checkable
class Checker(Protocol):
"""Alle Pruefer haben dieselbe Signatur -> die Engine ist method-agnostisch und
routet nur ueber ctrl.verification_method / ctrl.decision_method."""
verification_method: str
async def check(self, ctrl: ControlSpec, doc: DocContext) -> CheckResult:
...
@@ -0,0 +1,51 @@
"""CONTENT-Pruefer / decision_method=EMBEDDING.
Ist die Pflicht SEMANTISCH im Text vorhanden? Max-Cosinus (Doc-Chunks x Control-
Paraphrasen) >= per-Control-Schwelle. Deterministisch (festes Embedding-Modell)
und gecacht. Rettet Recall-FP (Klausel da, anders formuliert).
Faellt der Embedding-Service aus, liefert der Checker present=None (unklar) — der
Aufrufer behaelt dann das Keyword-Ergebnis (kein Hang, kein Crash).
(Validiert an AGB: 17 Items, per-Item-Schwelle, 0 Fehl-Rescue.)
"""
from __future__ import annotations
import asyncio
import logging
from .base import CheckResult, ControlSpec, DocContext, VerificationMethod
logger = logging.getLogger(__name__)
# Paraphrasen-Vektoren je Control einmal einbetten + cachen.
_PARA_CACHE: dict[str, list] = {}
class EmbeddingChecker:
verification_method = VerificationMethod.CONTENT
async def check(self, ctrl: ControlSpec, doc: DocContext) -> CheckResult:
text = doc.text or ""
paras = ctrl.paraphrases or []
thr = ctrl.embed_threshold if ctrl.embed_threshold is not None else 0.60
if not paras or len(text) < 100:
return CheckResult(present=None, source="embedding")
try:
from compliance.services.mc_embedding_matcher import (
DIM, _chunk_text, _cosine, _embed_texts,
)
if ctrl.control_id not in _PARA_CACHE:
pv = await _embed_texts(paras)
_PARA_CACHE[ctrl.control_id] = [v for v in pv if v and len(v) == DIM]
pvecs = _PARA_CACHE[ctrl.control_id]
chunks = _chunk_text(text)
cvecs = [v for v in await asyncio.wait_for(
_embed_texts(chunks), timeout=90.0) if v and len(v) == DIM]
except (Exception, asyncio.TimeoutError) as e:
logger.info("embedding checker inaktiv %s: %s", ctrl.control_id, str(e)[:80])
return CheckResult(present=None, source="embedding")
if not pvecs or not cvecs:
return CheckResult(present=None, source="embedding")
best = max((_cosine(p, c) for p in pvecs for c in cvecs), default=0.0)
return CheckResult(present=best >= thr, confidence=round(best, 3),
source="embedding")
@@ -0,0 +1,73 @@
"""CONTENT/CONTRACTUAL-Pruefer / decision_method=LLM.
present/absent ueber die LLM-Kaskade (`call_with_cascade`; prod: OVH-120b zuerst).
Retrieval = GANZE Paragraph-Abschnitte zum Topic (nicht Top-k-Chunks — das war in
der AGB-Validierung der Schluessel). KEIN DEFECT — Korrektheits-/Defekt-Pruefung
ist ein separater Modus. present=None bei Fehler (fail-safe: Aufrufer behaelt
Keyword-Ergebnis). (Validiert an AGB delivery/warranty.)
"""
from __future__ import annotations
import json
import logging
import re
from .base import CheckResult, ControlSpec, DocContext, VerificationMethod
logger = logging.getLogger(__name__)
_SECTION = re.compile(r"(?m)(?=^\s*(?:§\s*)?\d+[\.\)]\s)")
_SYS = (
"Du bist deutscher Compliance-Rechtsexperte. Entscheide, ob die genannte "
"Pflicht in den vorgelegten Abschnitten vorhanden ist. NUR die Abschnitte "
'zaehlen. Antworte NUR JSON: {"verdict":"ERFUELLT|FEHLT","zitat":"woertlich '
'oder leer","begruendung":"1 Satz"}.'
)
def _sections(text: str) -> list[str]:
return [s.strip() for s in _SECTION.split(text) if s.strip()]
def _parse(txt: str) -> dict:
out = (txt or "").strip()
if out.startswith("```"):
out = out.split("```", 2)[1]
out = out[4:] if out.startswith("json") else out
a, b = out.find("{"), out.rfind("}")
return json.loads(out[a:b + 1] if 0 <= a < b else out)
class LLMChecker:
verification_method = VerificationMethod.CONTENT
async def check(self, ctrl: ControlSpec, doc: DocContext) -> CheckResult:
text = doc.text or ""
if len(text) < 50:
return CheckResult(present=None, source="llm")
secs = _sections(text)
if ctrl.topic_regex:
rel = [s for s in secs if re.search(ctrl.topic_regex, s, re.I)][:6] or secs[:6]
else:
rel = secs[:6]
question = ctrl.question or f"Ist die Pflicht '{ctrl.label}' im Text vorhanden?"
try:
from compliance.services.llm_cascade import call_with_cascade
r = await call_with_cascade(
_SYS,
json.dumps({"frage": question, "abschnitte": rel}, ensure_ascii=False),
min_confidence=0.6, max_tokens=500,
)
obj = _parse(r.get("text"))
verdict = obj.get("verdict")
zitat = (obj.get("zitat") or "")[:120]
if verdict not in ("ERFUELLT", "FEHLT"):
return CheckResult(present=None, evidence=zitat, source=r.get("source", "?"))
return CheckResult(
present=verdict == "ERFUELLT", evidence=zitat,
confidence=float(r.get("confidence") or 0.0),
source=r.get("source", "llm"),
)
except Exception as e:
logger.info("llm checker fail %s: %s", ctrl.control_id, str(e)[:80])
return CheckResult(present=None, source="error")
@@ -0,0 +1,41 @@
"""REFERENCE-Pruefer (verification_method=REFERENCE, decision_method=LINK_RESOLVER).
Ist ein klarer Verweis auf ein anderes Pflichtdokument vorhanden (+ optional: loest
der Link auf)? Deterministisch. Bsp: 'Details in unserer Datenschutzerklaerung'.
KEIN LLM, kein juristisches Urteil. (Validiert an AGB data_protection: 7/7.)
Die tatsaechliche HTTP-Aufloesung des Links ist ein optionaler Runtime-Schritt
(online), nicht Teil dieser deterministischen Text-Pruefung — die URL wird hier
nur extrahiert und in `detail['link']` zurueckgegeben.
"""
from __future__ import annotations
import re
from .base import CheckResult, ControlSpec, DocContext, VerificationMethod
_URL = re.compile(r"https?://[^\s)\]]+", re.I)
class ReferenceChecker:
verification_method = VerificationMethod.REFERENCE
async def check(self, ctrl: ControlSpec, doc: DocContext) -> CheckResult:
text = doc.text or ""
pats = ctrl.patterns or []
if not pats or not text:
return CheckResult(present=False, source="reference")
for p in pats:
m = re.search(p, text, re.I)
if m:
window = text[max(0, m.start() - 40): m.end() + 200]
url = _URL.search(window) or _URL.search(text)
link = url.group(0) if url else None
return CheckResult(
present=True,
evidence=" ".join(m.group(0).split())[:120],
confidence=1.0,
source="reference",
detail={"link": link},
)
return CheckResult(present=False, source="reference")
@@ -0,0 +1,102 @@
"""AGB-Routing-Pipeline (C-lean): nimmt das Keyword-Ergebnis des ChecklistAgent
und routet keyword-durchgefallene Items per `_routing.decision_method` an die
wiederverwendbare Prüfer-Library (Embedding / Reference / LLM). Davor das
Geschäftsmodell-Gate (Applicability). Das Re-Tiering (LOW → Empfehlung) +
Output-Zusammenbau macht der AGBAgent — hier nur die Routing-Entscheidung.
Validiert (7-Firmen-Opus-GT): 71 % FP → ~0. agent.py bleibt dünn, dies ist der
einzige Ort des C-lean-Flows.
"""
from __future__ import annotations
import logging
from compliance.services.checkers.base import (
ControlSpec,
DecisionMethod,
DocContext,
VerificationMethod,
)
from compliance.services.checkers.embedding_checker import EmbeddingChecker
from compliance.services.checkers.llm_checker import LLMChecker
from compliance.services.checkers.reference_checker import ReferenceChecker
from . import _routing
logger = logging.getLogger(__name__)
# Checker sind zustandslos (schwere Imports erst in .check()) → Modul-Singletons.
_EMB = EmbeddingChecker()
_REF = ReferenceChecker()
_LLM = LLMChecker()
def _spec(item_id: str) -> ControlSpec:
"""ControlSpec für ein Item aus der AGB-Routing-Config bauen."""
dm = _routing.decision_method(item_id)
if dm == _routing.REFERENCE:
return ControlSpec(
control_id=item_id, verification_method=VerificationMethod.REFERENCE,
decision_method=DecisionMethod.LINK_RESOLVER,
patterns=[_routing.REFERENCE_PATTERNS[item_id]],
)
if dm == _routing.LLM:
return ControlSpec(
control_id=item_id, verification_method=VerificationMethod.CONTENT,
decision_method=DecisionMethod.LLM,
paraphrases=_routing.PARAPHRASES.get(item_id, []),
topic_regex=_routing.LLM_TOPIC.get(item_id, ""),
question=_routing.LLM_QUESTION.get(item_id, ""),
)
return ControlSpec(
control_id=item_id, verification_method=VerificationMethod.CONTENT,
decision_method=DecisionMethod.EMBEDDING,
paraphrases=_routing.PARAPHRASES.get(item_id, []),
embed_threshold=_routing.EMBED_THRESHOLDS.get(item_id),
)
async def _resolves(item_id: str, text: str, skip_llm: bool):
"""True = Klausel doch vorhanden (Keyword-Finding auflösen). False/None =
Finding behalten (fail-safe: bei Unsicherheit/Service-Ausfall lieber melden)."""
dm = _routing.decision_method(item_id)
if dm == _routing.MERGED:
return True # in ein anderes Item aufgegangen → kein eigenes Finding
doc = DocContext(text=text)
spec = _spec(item_id)
if dm == _routing.REFERENCE:
return (await _REF.check(spec, doc)).present
if dm == _routing.LLM:
if skip_llm:
return None # interaktiv: kein LLM → Keyword-Ergebnis behalten
return (await _LLM.check(spec, doc)).present
return (await _EMB.check(spec, doc)).present
async def run_routed(base_findings: list, text: str, context: dict | None = None):
"""Routet die keyword-durchgefallenen Items.
Returns (kept, resolved_ids, gated_ids):
kept = Findings, die nach Gate+Rescue bestehen bleiben
resolved_ids = per Embedding/Reference/LLM doch als vorhanden erkannt
gated_ids = per Geschäftsmodell nicht anwendbar (N/A)
"""
context = context or {}
skip_llm = bool(context.get("skip_llm"))
model = _routing.detect_business_model(text)
kept, resolved, gated = [], [], []
for f in base_findings:
item_id = f.field_id
if not _routing.is_applicable(item_id, model):
gated.append(item_id)
continue
try:
present = await _resolves(item_id, text, skip_llm)
except Exception as e: # noqa: BLE001 — best-effort, Finding behalten
logger.info("agb routing %s failed: %s", item_id, str(e)[:80])
present = None
if present is True:
resolved.append(item_id)
else:
kept.append(f)
return kept, resolved, gated
@@ -0,0 +1,144 @@
"""AGB-Routing — das verification_method / decision_method-Meta-Modell, angewandt
auf die AGB_CHECKLIST. Siehe docs-src/development/platform_checker_matrix.md.
Pro Checklisten-Item: WELCHER Pruefer (verification_method) und WIE entschieden
wird (decision_method). Single source of truth; `agb_checks.py` bleibt die reine
Pflichtangaben-Liste, dieses Modul ist der additive Routing-Overlay.
Validiert 2026-06-20/21 gegen 7-Firmen-Opus-GT (71 % FP -> ~0):
- 17 Items EMBEDDING (per-Item-Cosinus-Schwelle; 21 recall-FP gekillt, 0 Fehl-Rescue)
- 2 Items LLM (delivery_timeframe, warranty_period; ganze Paragraph-Abschnitte + starkes Modell, present/absent)
- 1 Item REFERENCE (data_protection; DSE-Verweis + Link, 7/7 deterministisch)
- incorporation_clause MERGED in contract (implizit, kein eigener Pruefer)
"""
from __future__ import annotations
# ── decision_method-Werte ────────────────────────────────────────────────
EMBEDDING = "EMBEDDING"
LLM = "LLM"
REFERENCE = "REFERENCE"
MERGED = "MERGED" # in ein anderes Item aufgegangen -> kein eigener Check
# ── Per-Item Embedding-Rescue-Schwellen ───────────────────────────────────
# An der 7-Firmen-GT kalibriert. BEWUSST per-Item: eine globale Schwelle trennt
# bei juristischer Prosa nicht (PASS/FAIL ueberlappen global, trennen per-Item).
# Vorlaeufig (FAIL n=25 klein) -> vor Prod mit mehr Firmen nachkalibrieren.
EMBED_THRESHOLDS: dict[str, float] = {
"scope": 0.58, "contract": 0.58, "payment": 0.60, "payment_methods": 0.58,
"delivery": 0.57, "warranty": 0.58, "termination": 0.60,
"termination_period": 0.60, "termination_form": 0.60, "consumer_rights": 0.55,
"liability": 0.615, "jurisdiction": 0.585, "dispute_odr_link": 0.67,
"choice_of_law_specific": 0.625, "payment_due_date": 0.705,
"salvatory_clause": 0.565, "amendment_clause": 0.635,
}
# ── decision_method je Item (deckt alle 21 Checklisten-IDs ab) ────────────
DECISION_METHOD: dict[str, str] = {cid: EMBEDDING for cid in EMBED_THRESHOLDS}
DECISION_METHOD.update({
"delivery_timeframe": LLM,
"warranty_period": LLM,
"data_protection": REFERENCE,
"incorporation_clause": MERGED, # -> contract
})
# ── Applicability-Gate (VOR allen Pruefern; Geschaeftsmodell entscheidet) ──
ABO_ONLY = {"termination", "termination_period", "termination_form"} # nur Dauerschuld
B2C_ONLY = {"consumer_rights", "dispute_odr_link"} # nicht reines B2B
# ── Referenz-Paraphrasen (Embedding-Rescue + LLM-Section-Ranking) ──────────
PARAPHRASES: dict[str, list[str]] = {
"scope": ["Diese AGB gelten fuer alle Vertraege zwischen dem Anbieter und dem Kunden.",
"Die Angebote richten sich ausschliesslich an Verbraucher, die privat kaufen.",
"Geltungsbereich: fuer die Geschaeftsbeziehung gelten die nachfolgenden Bedingungen."],
"contract": ["Durch Anklicken des Bestellbuttons gibt der Kunde ein verbindliches Angebot ab.",
"Der Vertrag kommt mit Zugang der Bestellbestaetigung zustande.",
"Mit der Bestellung erkennt der Kunde diese AGB als Vertragsbestandteil an."],
"liability": ["Die Haftung fuer leicht fahrlaessige Pflichtverletzungen ist beschraenkt.",
"Wir haften unbeschraenkt fuer Schaeden aus Verletzung von Leben, Koerper, Gesundheit.",
"Bei Verletzung wesentlicher Vertragspflichten Haftung auf vorhersehbaren Schaden begrenzt."],
"jurisdiction": ["Es gilt das Recht der Bundesrepublik Deutschland unter Ausschluss des UN-Kaufrechts.",
"Gerichtsstand fuer alle Streitigkeiten ist der Sitz des Unternehmens.",
"Auf die Vertraege findet deutsches Recht Anwendung."],
"dispute_odr_link": ["Die EU-Kommission stellt eine Plattform zur Online-Streitbeilegung bereit.",
"Zur aussergerichtlichen Streitbeilegung steht die OS-Plattform zur Verfuegung."],
"choice_of_law_specific": ["Es gilt deutsches Recht unter Ausschluss des UN-Kaufrechts (CISG).",
"Anwendbar ist das Recht der Bundesrepublik Deutschland."],
"payment": ["Die Preise sind Endpreise inklusive Mehrwertsteuer; Versandkosten gesondert ausgewiesen.",
"Zahlungsbedingungen und Preise richten sich nach den Angaben im Bestellprozess."],
"payment_methods": ["Zur Zahlung stehen Vorkasse, Kreditkarte, Lastschrift, Rechnung und PayPal zur Verfuegung.",
"Folgende Zahlungsarten werden akzeptiert: Ueberweisung, SEPA-Lastschrift, Kreditkarte."],
"payment_due_date": ["Der Kaufpreis ist sofort mit Vertragsschluss faellig.",
"Die Zahlung ist bei Bestellung zu leisten.",
"Der Rechnungsbetrag wird mit Versand der Ware faellig.",
"Bei Kauf auf Rechnung ist der Betrag innerhalb von 14 Tagen zu zahlen."],
"delivery": ["Die Lieferung erfolgt an die vom Kunden angegebene Lieferadresse.",
"Wir liefern innerhalb Deutschlands; die Leistung wird nach Vertragsschluss erbracht."],
"delivery_timeframe": ["Die Lieferzeit betraegt in der Regel 3-5 Werktage.",
"Die Ware wird voraussichtlich innerhalb von 2 bis 4 Werktagen geliefert."],
"warranty": ["Es gelten die gesetzlichen Maengelhaftungsrechte (Gewaehrleistung).",
"Bei Maengeln stehen dem Kunden die gesetzlichen Gewaehrleistungsrechte zu.",
"Fuer Sachmaengel haften wir nach den gesetzlichen Bestimmungen."],
"warranty_period": ["Die Gewaehrleistungsfrist betraegt zwei Jahre ab Lieferung.",
"Die Verjaehrungsfrist fuer Maengelansprueche betraegt zwei Jahre."],
"termination": ["Der Vertrag kann von beiden Parteien ordentlich gekuendigt werden.",
"Das Abonnement kann jederzeit zum Ende der Laufzeit gekuendigt werden."],
"termination_period": ["Die Kuendigungsfrist betraegt einen Monat zum Vertragsende.",
"Der Vertrag ist mit einer Frist von vier Wochen kuendbar."],
"termination_form": ["Die Kuendigung bedarf der Textform und kann per E-Mail erfolgen.",
"Eine Kuendigung ist schriftlich oder per E-Mail moeglich."],
"salvatory_clause": ["Sollten einzelne Bestimmungen unwirksam sein, bleibt die Wirksamkeit der uebrigen unberuehrt.",
"Die Unwirksamkeit einzelner Klauseln beruehrt nicht die Gueltigkeit der uebrigen AGB."],
"amendment_clause": ["Wir behalten uns vor, diese AGB mit Wirkung fuer die Zukunft zu aendern.",
"Aenderungen dieser Bedingungen werden dem Kunden rechtzeitig mitgeteilt."],
"consumer_rights": ["Die gesetzlichen Rechte des Verbrauchers bleiben unberuehrt.",
"Zwingende Verbraucherschutzvorschriften bleiben von diesen Bedingungen unberuehrt."],
}
# ── LLM-Items: Paragraph-Abschnitts-Retrieval + Pruef-Frage ───────────────
LLM_TOPIC: dict[str, str] = {
"delivery_timeframe": r"liefer",
"warranty_period": r"gew(?:ä|ae)hrleist|m(?:ä|ae)ngel|sachm|verj(?:ä|ae)hr|haftungsdauer|garantie",
}
LLM_QUESTION: dict[str, str] = {
"delivery_timeframe": ("Wird eine KONKRETE Lieferzeit/Lieferfrist genannt (z.B. '3-5 Werktage', "
"'innerhalb von 2 Werktagen')? Eine nur allgemeine Lieferregelung ODER ein "
"Verweis 'Lieferzeit im Bestellvorgang' ohne konkrete Frist zaehlt NICHT."),
"warranty_period": ("Wird eine KONKRETE Gewaehrleistungs-/Verjaehrungsfrist als ZAHL genannt "
"(z.B. 'zwei Jahre', 'ein Jahr')? Ein blosser Verweis auf 'gesetzliche "
"Verjaehrungsfristen' ohne Zahl zaehlt NICHT."),
}
# ── REFERENCE-Item data_protection ────────────────────────────────────────
REFERENCE_PATTERNS: dict[str, str] = {
"data_protection": r"datenschutz(erkl(?:ä|ae)rung|bestimmung|hinweis)",
}
def detect_business_model(text: str) -> dict[str, bool]:
"""Deterministischer Geschaeftsmodell-Detektor fuer das Applicability-Gate.
Edge-Case: gemischte Modelle (Webshop + Finanzierung/Service) koennen 'abo'
triggern -> dann greift das termination-Gate nicht; bewusst konservativ
(lieber eine Kuendigungs-Pruefung zu viel als eine echte Luecke uebersehen)."""
tl = text.lower()
consumer = ("widerrufsbelehrung" in tl) or ("widerrufsrecht" in tl and "verbraucher" in tl)
b2b = (not consumer) and any(s in tl for s in (
"geschäftskunden", "ausschließlich an unternehmer", "nur an unternehmer",
"lieferbedingungen für geschäftskunden"))
abo = any(s in tl for s in (
"abonnement", "mindestlaufzeit", "vertragslaufzeit", "verlängert sich",
"monatsabo", "jahresabo")) or ("abo" in tl and "kündig" in tl)
return {"b2b": b2b, "abo": abo, "b2c": not b2b}
def is_applicable(item_id: str, model: dict[str, bool]) -> bool:
"""Gate: gilt das Item fuer dieses Geschaeftsmodell? (False -> N/A, nicht pruefen)."""
if item_id in ABO_ONLY and not model.get("abo"):
return False
if item_id in B2C_ONLY and model.get("b2b"):
return False
return True
def decision_method(item_id: str) -> str:
"""decision_method fuer ein Item; Default EMBEDDING (Prosa-Rescue)."""
return DECISION_METHOD.get(item_id, EMBEDDING)
@@ -1,19 +1,60 @@
"""AGBAgent — Allgemeine Geschäftsbedingungen (§§ 305 ff. BGB).
Thin-Subclass von ChecklistAgent über die kuratierte AGB_CHECKLIST (L1
Pflichtangaben + L2 Detailchecks). KEIN Library-Firehose.
ChecklistAgent-Subclass: erst L1/L2-Keyword-Pass, dann **C-lean-Routing** — die
keyword-durchgefallenen Items werden per `decision_method` an die wiederverwendbare
Prüfer-Library geroutet (Embedding / Reference / LLM), davor das Geschäftsmodell-
Gate (Applicability), danach Severity-Re-Tiering (LOW → Empfehlung).
Validiert gegen 7-Firmen-Opus-GT: 71 % FP → ~0. Config in `_routing`, Flow in `_pipeline`.
"""
from __future__ import annotations
from compliance.services.doc_checks.agb_checks import AGB_CHECKLIST
from .._base import AgentInput, AgentOutput, lint_output
from .._checklist_agent import ChecklistAgent
from .._rollup import rollup
from ._pipeline import run_routed
class AGBAgent(ChecklistAgent):
CHECKLIST = AGB_CHECKLIST
agent_id = "agb"
agent_version = "1.0"
agent_version = "2.0" # v2: decision_method-Routing (Embedding/Reference/LLM)
doc_type = "agb"
owned_mc_ids = tuple(c["id"] for c in AGB_CHECKLIST)
async def evaluate(self, agent_input: AgentInput) -> AgentOutput:
# 1) Basis-Keyword-Pass (L1/L2). out.findings = keyword-durchgefallene Items.
out = await super().evaluate(agent_input)
text = (agent_input.text or "").strip()
if len(text) < 100 or not out.findings:
return out # zu kurz / nichts zu routen
# 2) Routing: Gate + Embedding/Reference/LLM-Rescue der Keyword-Misses.
kept, resolved, gated = await run_routed(
out.findings, text, agent_input.context)
resolved_set, gated_set = set(resolved), set(gated)
# 3) Coverage angleichen: rescued → ok, gated → na.
for c in out.mc_coverage:
if c.mc_id in resolved_set:
c.status, c.reason = "ok", "semantisch vorhanden (Routing)"
elif c.mc_id in gated_set:
c.status, c.reason = "na", "für Geschäftsmodell nicht anwendbar"
# 4) Severity-Re-Tiering: HIGH/MEDIUM = Findings, LOW = nur Empfehlung.
out.findings = [f for f in kept if f.severity in ("HIGH", "MEDIUM")]
out.recommendations = rollup(kept)
# 5) Aggregat-Kennzahlen neu (Coverage hat sich verschoben).
cov = out.mc_coverage
out.mc_total = len(cov)
out.mc_ok = sum(1 for c in cov if c.status == "ok")
out.mc_na = sum(1 for c in cov if c.status == "na")
out.mc_high = sum(1 for c in cov if c.status == "high")
out.mc_medium = sum(1 for c in cov if c.status == "medium")
out.mc_low = sum(1 for c in cov if c.status == "low")
out.notes = ((out.notes + " · ") if out.notes else "") + \
f"routed: {len(resolved)} rescued, {len(gated)} n/a"
return lint_output(out)
@@ -0,0 +1,80 @@
"""Applicability-Gate fuer den DSE-Scan.
Schliesst Controls aus dem DSE-FINDINGS-Scan aus, die laut
`compliance.control_classification` NICHT gegen eine DSE laufen
('DSE' nicht in applicable_artifacts) UND sicher klassifiziert sind
(needs_review=false). Diese werden NICHT geloescht, sondern als
*organisatorische Checkliste* zurueckgegeben (Routing zu VVT/TOM/Audit).
Fail-safe: unsichere Klassifikationen (needs_review=true) bleiben im
Findings-Scan. Defensiv: fehlt die Tabelle (z.B. Prod ohne Migration),
liefert das Gate ein leeres Dict -> es wird NICHT gefiltert.
"""
from __future__ import annotations
import logging
import os
from typing import Any
logger = logging.getLogger(__name__)
async def load_dse_gate(db_url: str = "") -> dict[str, dict[str, Any]]:
"""Liefert {control_id: meta} fuer Controls, die aus dem DSE-Findings-Scan
auszuschliessen sind (hochsicher organisatorisch). Leeres Dict = kein Filter.
"""
dsn = (db_url or os.getenv("DATABASE_URL")
or os.getenv("COMPLIANCE_DATABASE_URL") or "")
if not dsn:
return {}
try:
import asyncpg
conn = await asyncpg.connect(dsn)
try:
rows = await conn.fetch(
"""SELECT control_id, obligation_type, check_intent,
applicable_artifacts, reference_allowed
FROM compliance.control_classification
WHERE is_active AND NOT needs_review
AND NOT ('DSE' = ANY(applicable_artifacts))""")
finally:
await conn.close()
except Exception as e: # Tabelle fehlt / DB nicht erreichbar -> kein Filter
logger.info("dse classification gate inaktiv: %s", str(e)[:90])
return {}
return {
r["control_id"]: {
"obligation_type": r["obligation_type"],
"check_intent": r["check_intent"],
"applicable_artifacts": list(r["applicable_artifacts"] or []),
"reference_allowed": r["reference_allowed"],
}
for r in rows if r["control_id"]
}
def apply_gate(
controls: list[dict[str, Any]],
gate: dict[str, dict[str, Any]],
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
"""Teilt geladene Controls in (findings_controls, organizational).
findings_controls: laufen normal durch den DSE-Scan.
organizational: aus dem Scan genommen, als Checkliste ausgegeben
(control_id + title + Klassifikations-Metadaten fuer das Routing).
"""
kept: list[dict[str, Any]] = []
organizational: list[dict[str, Any]] = []
for c in controls:
cid = c.get("control_id")
meta = gate.get(cid) if cid else None
if meta:
organizational.append({
"control_id": cid,
"title": c.get("title"),
**meta,
})
else:
kept.append(c)
return kept, organizational
@@ -0,0 +1,170 @@
"""Deterministische semantische Recall-Schicht für den DSE-Check.
WARUM: Reines Keyword-Matching hat schlechten Recall (eine Pflicht lässt sich
auf viele Arten formulieren). Der frühere Regex-Boost war zu stumpf (über-passt
auf vollständigen Dokumenten). BGE-M3-Embeddings erkennen den SINN — und sind
dabei DETERMINISTISCH: ein Embedding-Modell ist eine feste Funktion, gleicher
Text → gleicher Vektor → gleiches Pass/Fail bei fester Schwelle. Reproduzierbar,
auditierbar, kein Keyword-Katalog, kein generatives LLM zur Checkzeit.
Design:
- Doc wird EINMAL pro Dokument-Hash eingebettet (teuer: ~37s/64k-Doc), die
Per-Control-Scores werden gecacht (/data) → Folge-Checks sind instant.
- Reachability-Guard: ist der Embedding-Service nicht erreichbar, liefert die
Schicht leer zurück (der deterministische Keyword-Layer trägt) — KEIN Hang.
- Schwelle ist die einzige Stellschraube (DSE-Default 0.65, an BMW-GT kalibriert;
braucht Mehr-Firmen-Kalibrierung gegen Overfitting — bewusst konservativ).
"""
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import os
import sqlite3
from typing import Iterable
logger = logging.getLogger(__name__)
# DSE-Schwelle: an BMW-Haiku-GT vermessen (PASS-Median 0.648 / FAIL-Median 0.612).
# 0.65 = präzisionsfreundlich (wenig Über-Pass). Per ENV überschreibbar für
# spätere Mehr-Firmen-Kalibrierung, ohne Code-Änderung.
DSE_EMBED_THRESHOLD = float(os.getenv("DSE_EMBED_THRESHOLD", "0.65"))
_CACHE_PATH = os.getenv("DSE_EMBED_CACHE", "/data/dse_embed_cache.json")
_SIDECAR_DB = os.getenv("MC_CLASS_DB", "/data/mc_classification.db")
def _doc_hash(text: str) -> str:
return hashlib.sha256(text.encode("utf-8", "ignore")).hexdigest()[:20]
def _load_cache() -> dict:
try:
with open(_CACHE_PATH, encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def _save_cache(cache: dict) -> None:
try:
# LRU-Kappung: max 30 Dokumente im Cache (Scores sind klein)
if len(cache) > 30:
for k in list(cache.keys())[:-30]:
cache.pop(k, None)
tmp = _CACHE_PATH + ".tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(cache, f)
os.replace(tmp, _CACHE_PATH)
except Exception as e:
logger.warning("dse embed-cache save failed: %s", e)
def _load_control_vecs(cids: Iterable[str]) -> dict[str, list[float]]:
from compliance.services.mc_embedding_matcher import _blob_to_vec
cid_list = [c for c in cids if c]
if not cid_list:
return {}
try:
with sqlite3.connect(_SIDECAR_DB) as c:
ph = ",".join("?" * len(cid_list))
rows = c.execute(
f"SELECT control_id, embedding FROM mc_classification "
f"WHERE control_id IN ({ph}) AND doc_type='dse' "
f"AND check_type='text' AND embedding IS NOT NULL",
cid_list,
).fetchall()
return {cid: _blob_to_vec(b) for cid, b in rows}
except Exception as e:
logger.warning("dse control-vec load failed: %s", e)
return {}
async def _embedding_reachable(timeout: float = 2.0) -> bool:
"""Schneller TCP-Connect zum Embedding-Service. Verhindert, dass ein toter
Service den Check blockiert (macmini-Lehrer-Last hat das Embedding früher
verstopft)."""
url = os.getenv("EMBEDDING_URL", "http://embedding-service:8087")
hostport = url.split("://", 1)[-1].split("/", 1)[0]
host, _, port = hostport.partition(":")
port = int(port or "8087")
try:
fut = asyncio.open_connection(host, port)
reader, writer = await asyncio.wait_for(fut, timeout=timeout)
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
return True
except Exception as e:
logger.warning("dse embedding-service nicht erreichbar (%s) — "
"deterministischer Layer trägt", e)
return False
async def _compute_scores(text: str, all_cids: list[str]) -> dict[str, float]:
"""Bettet das Dokument EINMAL ein und liefert max-Cosinus je Control."""
from compliance.services.mc_embedding_matcher import (
_chunk_text, _cosine, _embed_texts, DIM,
)
mc_vecs = _load_control_vecs(all_cids)
if not mc_vecs:
return {}
chunks = _chunk_text(text)
if not chunks:
return {}
chunk_vecs = await _embed_texts(chunks)
chunk_vecs = [v for v in chunk_vecs if v and len(v) == DIM]
if not chunk_vecs:
return {}
return {
cid: round(float(max((_cosine(mv, cv) for cv in chunk_vecs),
default=0.0)), 4)
for cid, mv in mc_vecs.items()
}
async def embedding_recall(
text: str,
candidate_cids: Iterable[str],
threshold: float | None = None,
embed_timeout: float = 90.0,
) -> set[str]:
"""Returns die candidate control_ids, die semantisch (>= Schwelle) im Doc
vorkommen. Deterministisch + gecacht. Leeres Set, wenn Service down/Fehler.
candidate_cids: die im Keyword-Layer DURCHGEFALLENEN Controls (Recall-Rescue).
"""
cands = [c for c in candidate_cids if c]
if not text or len(text) < 100 or not cands:
return set()
thr = DSE_EMBED_THRESHOLD if threshold is None else threshold
h = _doc_hash(text)
cache = _load_cache()
scores = cache.get(h)
if scores is None:
if not await _embedding_reachable():
return set()
try:
scores = await asyncio.wait_for(
_compute_scores(text, cands), timeout=embed_timeout)
except (Exception, asyncio.TimeoutError) as e:
logger.warning("dse embedding_recall skipped: %s", e)
return set()
if not scores:
return set()
cache[h] = scores
_save_cache(cache)
logger.info("dse embedding_recall: doc %s eingebettet (%d Scores)",
h, len(scores))
else:
logger.info("dse embedding_recall: Cache-Treffer doc %s", h)
cand_set = set(cands)
return {cid for cid, s in scores.items()
if cid in cand_set and s >= thr}
@@ -1,29 +1,286 @@
"""DSEAgent — Datenschutzerklärung / Datenschutzinformation (Art. 13/14 DSGVO).
"""DSE-Agent v3 — Datenschutzerklärung / Datenschutzinformation (Art. 13/14
DSGVO), baut auf doc_check_controls (267 text-MCs aus DB).
Thin-Subclass von ChecklistAgent über die kuratierte ART13_CHECKLIST (KEIN
90k-Library-Firehose). Einzige Spezialität: Drittland wird bei dokumentiertem
Drittlandtransfer (Scan-Kontext) zu HIGH angehoben.
Volle Parität zu impressum/ + cookie_policy/ (User-Vorgabe 2026-06-17):
Layer 0 — Regex-Boost (kuratierte Art-13-Patterns aus mcs.py / ART13_CHECKLIST)
Layer 1 — Keyword-Match aus pass_criteria der DSE-DB-MCs (deterministisch)
Layer 2 — BGE-M3 Embedding-Match
Layer 3 — Semantic-Validator (LLM) für offene HIGH/MEDIUM-Fails + Auto-Learning
Die kuratierten Patterns gehen NICHT verloren — sie boosten (Layer 0) die DB-
Controls (z.B. präzises "keine Drittlandübermittlung" → drittland-MC PASS, kein
False-Positive). DSE-Spezialität bleibt: Drittland → HIGH bei dokumentiertem
Transfer (scan_context).
Output-Layer (Linter / Rollup / Methodik-UI) bleibt 1:1.
"""
from __future__ import annotations
from compliance.services.doc_checks.dse_checks import ART13_CHECKLIST
import logging
from datetime import datetime, timezone
from .._base import AgentInput
from .._checklist_agent import ChecklistAgent
from .._base import (
AgentInput,
AgentOutput,
BaseSpecialistAgent,
EscalationLog,
EvidenceSource,
Finding,
McCoverage,
Severity,
SourceType,
lint_output,
)
from .._pattern_library import record as record_pattern
from .._rollup import rollup
from .._semantic_validator import build_rename_action, validate_present
from .mcs import MC_IDS, MCS
from .regex_boost import BOOST_KEYWORDS
from .v3_engine import run_v3_pipeline
logger = logging.getLogger(__name__)
class DSEAgent(ChecklistAgent):
CHECKLIST = ART13_CHECKLIST
_SEV_TO_ENUM = {
"CRITICAL": Severity.HIGH,
"HIGH": Severity.HIGH,
"MEDIUM": Severity.MEDIUM,
"LOW": Severity.LOW,
"INFO": Severity.INFO,
}
# Drittland-Vokabeln für die scan_context-Heraufstufung (Art. 13(1)(f)).
_THIRD_COUNTRY_KW = tuple(set(
BOOST_KEYWORDS.get("third_country", ())
+ BOOST_KEYWORDS.get("third_country_mechanism", ())
))
def _build_measure(label: str, norm: str) -> str:
"""Maßnahme (Imperativ) statt Pruef-Frage als action."""
base = (label or "").strip().rstrip(".")
if not base:
return ("Datenschutz-Pflichtangabe ergänzen und gegen Art. 13/14 "
"DSGVO prüfen.")
msg = f"Pflichtangabe ergänzen: {base}."
if norm:
msg += f" Rechtsgrundlage: {norm}."
return msg
def _is_third_country_topic(result: dict) -> bool:
"""Ist dieses DB-MC thematisch ein Drittland-Control?"""
parts: list[str] = [str(result.get("label") or "").lower()]
for c in (result.get("_pass_criteria") or []):
if c:
parts.append(str(c).lower())
blob = " ".join(parts)
hits = sum(1 for kw in _THIRD_COUNTRY_KW if kw in blob)
return hits >= 1
class DSEAgent(BaseSpecialistAgent):
agent_id = "dse"
agent_version = "1.0"
agent_version = "3.0"
doc_type = "dse"
owned_mc_ids = tuple(c["id"] for c in ART13_CHECKLIST)
owned_mc_ids = MC_IDS
def _severity_override(self, c: dict, agent_input: AgentInput):
def _third_country_transfer(self, agent_input: AgentInput) -> bool:
sc = (agent_input.context or {}).get("scan_context") or {}
tc = str(sc.get("third_country_transfer", "")).lower() in (
return str(sc.get("third_country_transfer", "")).lower() in (
"yes", "true", "1", "ja")
if tc and c["id"] in ("third_country", "third_country_mechanism"):
return "HIGH"
return None
async def evaluate(self, agent_input: AgentInput) -> AgentOutput:
start = datetime.now(timezone.utc)
text = (agent_input.text or "").strip()
scope = set(agent_input.business_scope or [])
coverage: list[McCoverage] = []
findings: list[Finding] = []
esc_logs: list[EscalationLog] = []
notes_parts: list[str] = []
if len(text) < 100:
for mc in MCS:
coverage.append(McCoverage(
mc_id=mc.mc_id, status="skipped",
label=mc.label, reason="text too short",
))
return self._finalize(
start, findings, esc_logs, coverage,
confidence=0.0,
notes="DSE-Text zu kurz oder leer.",
)
tc_transfer = self._third_country_transfer(agent_input)
# Embedding-Recall (Layer 2) läuft IMMER — deterministisch, gecacht
# (pro Doc-Hash → Folge-Views instant) und Reachability-gegated
# (kein Hang, wenn der Service fehlt). Ersetzt den über-passenden Boost.
results, telemetry = await run_v3_pipeline(text, scope)
notes_parts.append(
f"v3-pipeline: {telemetry.get('total_mcs', 0)} DB-MCs · "
f"{telemetry.get('layer_1_pass', 0)} Keyword-Treffer · "
f"{telemetry.get('embedding_passes', 0)} semantisch (Embedding)"
)
if telemetry.get("sector_dropped") or telemetry.get("offtopic_dropped"):
notes_parts.append(
f"Scope-Filter: {telemetry.get('sector_dropped', 0)} "
f"Branchen-MCs, {telemetry.get('offtopic_dropped', 0)} "
"themenfremde MCs entfernt"
)
seen: set[str] = set()
for r in results:
mc_id = r.get("control_id") or ""
if not mc_id or mc_id in seen:
continue
seen.add(mc_id)
passed = bool(r.get("passed"))
sev = _SEV_TO_ENUM.get(
(r.get("severity") or "MEDIUM").upper(), Severity.MEDIUM,
)
# DSE-Spezialität: Drittland → HIGH bei dokumentiertem Transfer.
sev_reason = "db_mc_failed"
if tc_transfer and _is_third_country_topic(r):
sev = Severity.HIGH
sev_reason = "db_mc_failed_third_country_transfer"
coverage.append(McCoverage(
mc_id=mc_id,
status="ok" if passed else sev.value.lower(),
reason=str(r.get("matched_text") or r.get("hint") or "")[:120],
))
if passed:
continue
label = r.get("label") or r.get("hint") or ""
norm_str = str(r.get("regulation") or "").strip()
if r.get("article"):
norm_str = (norm_str + f" Art. {r.get('article')}").strip()
if not norm_str:
norm_str = "DSGVO Art. 13/14"
findings.append(Finding(
check_id=f"DSE-DBMC-{mc_id}",
agent=self.agent_id,
agent_version=self.agent_version,
field_id=mc_id,
severity=sev,
severity_reason=sev_reason,
title=str(label)[:200] or f"DB-MC {mc_id} nicht erfüllt",
norm=norm_str,
evidence="",
action=_build_measure(str(label), norm_str)[:400],
confidence=0.9,
sources=[EvidenceSource(
source_type=SourceType.MC,
source_id=mc_id,
detail=str(r.get("source") or "keyword_match")[:120],
confidence=0.9,
)],
))
# Boost-Coverage: meine Pattern-Treffer (regex-boost field_ids).
boost_ids = set(telemetry.get("layer_0_field_ids") or [])
for mc in MCS:
coverage.append(McCoverage(
mc_id=mc.mc_id,
status="ok" if mc.field_id in boost_ids else "na",
label=mc.label,
reason=("regex-boost hit"
if mc.field_id in boost_ids
else "kein Pattern-Treffer (kein Veto)"),
))
if not (agent_input.context or {}).get("skip_llm"):
await self._semantic_demote(text, findings, coverage)
confs = [f.confidence for f in findings if f.confidence] or [0.95]
overall = sum(confs) / len(confs)
return self._finalize(
start, findings, esc_logs, coverage,
confidence=overall, notes=" · ".join(notes_parts),
)
async def _semantic_demote(
self, text: str, findings: list[Finding],
coverage: list[McCoverage],
) -> None:
"""LLM-Layer für HIGH/MEDIUM-DB-MCs: Label-Mismatch-Check.
Bei Fund → HIGH/MEDIUM → LOW + Rename-Action."""
candidates = [
f for f in findings
if f.severity in (Severity.HIGH.value, Severity.MEDIUM.value)
and f.severity_reason in (
"db_mc_failed", "db_mc_failed_third_country_transfer")
]
if not candidates:
return
result = await validate_present(
text, [(f.field_id, f.title[:80]) for f in candidates],
)
if not result:
return
for finding in candidates:
row = result.get(finding.field_id)
if not row or not row.get("found"):
continue
if row.get("confidence", 0) < 0.6:
continue
label_used = row.get("label_used") or "abweichendes Label"
conf = float(row.get("confidence") or 0.8)
finding.severity = Severity.LOW.value
finding.severity_reason = "label_mismatch"
finding.title = (
f"Label '{label_used}' weicht von Standard ab"
)
finding.evidence = str(row.get("evidence") or "")[:200]
finding.action = build_rename_action(
finding.field_id, label_used,
)
finding.confidence = conf
finding.sources.append(EvidenceSource(
source_type=SourceType.LLM_LOCAL,
source_id="semantic_validator",
detail=f"LLM-confirmed: '{label_used}'",
confidence=conf,
))
for c in coverage:
if c.mc_id == finding.field_id:
c.status = "low"
c.reason = f"label_mismatch: '{label_used}'"
try:
record_pattern(
field_id=finding.field_id,
label_used=label_used,
confidence=conf,
agent_id=self.agent_id,
)
except Exception as e:
logger.warning("pattern-library record failed: %s", e)
def _finalize(
self, start: datetime, findings: list[Finding],
esc_logs: list[EscalationLog], coverage: list[McCoverage],
confidence: float, notes: str = "",
) -> AgentOutput:
end = datetime.now(timezone.utc)
recs = rollup(findings)
out = AgentOutput(
agent=self.agent_id,
agent_version=self.agent_version,
started_at=start,
finished_at=end,
duration_ms=int((end - start).total_seconds() * 1000),
findings=findings,
recommendations=recs,
mc_coverage=coverage,
escalation_log=esc_logs,
confidence=confidence,
notes=notes,
mc_total=len(coverage),
mc_ok=sum(1 for c in coverage if c.status == "ok"),
mc_na=sum(1 for c in coverage if c.status == "na"),
mc_high=sum(1 for c in coverage if c.status == "high"),
mc_medium=sum(1 for c in coverage if c.status == "medium"),
mc_low=sum(1 for c in coverage if c.status == "low"),
)
return lint_output(out)
@@ -0,0 +1,129 @@
"""DSE-Tiefenprüfung: LLM-Kaskade auf die UNSCHARFEN Findings.
User-Architektur (2026-06-18): die deterministische Engine (Keyword + Embedding)
triagiert. Eindeutige Fälle (sehr hoher/niedriger Embedding-Score) bleiben
deterministisch. Die UNSCHARFE Mitte + grenzwertig-Bestandene gehen durch die
Kaskade — denn dort entstehen sowohl 'verpasste Lücken' (schlimmster Fehler) als
auch Falsch-Findings (Rework).
Eskalation auf ANTWORT-UNSICHERHEIT (nicht JSON-Gültigkeit): jedes Tier liefert
{erfuellt, confidence, begruendung}. Confidence < Schwelle → nächstes Tier.
Tier 1: Qwen 35B (lokal, schnell, billig)
Tier 2: OVH gpt-oss-120B
Tier 3: Claude — NUR mit Freigabe (allow_claude), sonst 'needs_freigabe'.
Judging-Leitplanken (User-Vorgaben):
- Speicherdauer nur erfüllt bei konkreter Höchstdauer ODER echtem,
nachvollziehbarem Kriterium — NICHT zirkulär ('bis Zweck wegfällt').
- Ohne ausreichenden Kontext → eher nicht erfüllt (nichts fehlen lassen).
"""
from __future__ import annotations
import json
import logging
import os
from compliance.services.llm_cascade import (
_call_anthropic, _call_ollama, _call_ovh,
)
logger = logging.getLogger(__name__)
# Unscharfe Embedding-Zone (kalibriert an 5-Firmen-GT 2026-06-18): außerhalb ist
# die Engine sicher genug, innen entscheidet der LLM.
FUZZY_LO = float(os.getenv("DSE_FUZZY_LO", "0.50"))
FUZZY_HI = float(os.getenv("DSE_FUZZY_HI", "0.72"))
# Selbstkonfidenz-Schwelle: darunter → eskalieren.
ESC_CONF = float(os.getenv("DSE_ESC_CONF", "0.75"))
_JUDGE_SYS = (
"Du bist ein erfahrener DSGVO-Datenschutz-Auditor. Du prüfst, ob eine "
"konkrete Pflicht in einer Datenschutzerklärung (DSE) ERFÜLLT ist. "
"Sei streng wie ein Fachanwalt: lieber 'nicht erfüllt' wenn unklar — eine "
"übersehene Lücke ist schlimmer als ein Hinweis zu viel. "
"Speicherdauer ist NUR erfüllt bei konkreter Höchstdauer ODER einem echten, "
"nachvollziehbaren Kriterium; zirkuläre Formeln ('bis der Zweck wegfällt') "
"erfüllen die Pflicht NICHT. "
'Antworte AUSSCHLIESSLICH als JSON: '
'{"erfuellt": true|false, "confidence": 0.0-1.0, "begruendung": "kurz"}'
)
def _build_user(doc_text: str, title: str, criteria: list) -> str:
crit = "; ".join(str(c) for c in (criteria or []) if c)[:600]
return (
f"PFLICHT: {title}\n"
f"Erfüllt, wenn: {crit}\n\n"
f"DATENSCHUTZERKLÄRUNG (Auszug):\n{doc_text[:14000]}\n\n"
"Ist die Pflicht im Text inhaltlich erfüllt?"
)
def _parse(text: str) -> dict | None:
if not text:
return None
s, e = text.find("{"), text.rfind("}")
if s < 0 or e <= s:
return None
try:
o = json.loads(text[s:e + 1])
return {
"erfuellt": bool(o.get("erfuellt")),
"confidence": float(o.get("confidence") or 0.0),
"begruendung": str(o.get("begruendung") or "")[:300],
}
except Exception:
return None
async def judge_control(
doc_text: str, title: str, criteria: list, allow_claude: bool = False,
) -> dict:
"""Tiered judgment mit Selbstkonfidenz-Eskalation. Returns
{erfuellt, confidence, source, begruendung, needs_freigabe}."""
user = _build_user(doc_text, title, criteria)
tiers = [("qwen", _call_ollama), ("ovh_120b", _call_ovh)]
best: dict | None = None
for name, call in tiers:
try:
if name == "qwen":
txt = await call(_JUDGE_SYS, user, max_tokens=400,
timeout=60, think=False)
else:
txt = await call(_JUDGE_SYS, user, max_tokens=400)
except Exception as e:
logger.warning("deep_check tier %s failed: %s", name, e)
txt = ""
p = _parse(txt)
if p:
p["source"] = name
best = p
if p["confidence"] >= ESC_CONF:
return {**p, "needs_freigabe": False}
# Tier 3: Claude — nur mit Freigabe
if not allow_claude:
if best:
return {**best, "needs_freigabe": True}
return {"erfuellt": False, "confidence": 0.0, "source": "none",
"begruendung": "Unsicher — Anwalt/Claude-Freigabe nötig",
"needs_freigabe": True}
try:
txt = await _call_anthropic(_JUDGE_SYS, user, max_tokens=400)
p = _parse(txt)
if p:
return {**p, "source": "anthropic_claude", "needs_freigabe": False}
except Exception as e:
logger.warning("deep_check claude failed: %s", e)
if best:
return {**best, "needs_freigabe": False}
return {"erfuellt": False, "confidence": 0.0, "source": "none",
"begruendung": "Kein LLM-Ergebnis", "needs_freigabe": False}
def is_fuzzy(score: float, kw_pass: bool) -> bool:
"""Unscharf = im Embedding-Graubereich UND nicht durch Keyword klar bestätigt.
Klar-bestanden (kw) bleibt deterministisch; klar-hoch/niedrig auch."""
if kw_pass:
return False
return FUZZY_LO <= score <= FUZZY_HI
@@ -0,0 +1,78 @@
"""Machine-Check-Definitionen für den DSE-Agent (Layer-0 Regex-Boost).
Eine MC = ein abgegrenztes Art-13/14-DSGVO-Pflichtfeld mit deterministischen
Patterns. Quelle der Patterns ist die EINE kuratierte ART13_CHECKLIST
(doc_checks/dse_checks.py) — hier nur in das MC-Format gehoben, damit der
Regex-Boost (regex_boost.py) und die v3-Engine (v3_engine.py) dieselbe Struktur
nutzen wie impressum/ + cookie_policy/. KEINE Pattern-Duplikation: die Patterns
bleiben in dse_checks.py, dieses Modul kompiliert sie nur.
Owner = dse-agent.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from typing import Pattern
from compliance.services.doc_checks.dse_checks import ART13_CHECKLIST
@dataclass(frozen=True)
class MC:
"""Eine Machine-Check-Definition (Boost-Pattern für ein DSE-Feld)."""
mc_id: str # DSE-MC-001 ...
field_id: str # controller, legal_basis, third_country ...
label: str
norm: str
patterns: tuple[Pattern[str], ...] = field(default_factory=tuple)
severity_if_missing: str = "MEDIUM"
level: int = 1
_NORM_RE = re.compile(r"\((Art\.[^)]+|§\s*\d+[^)]*)\)")
def _norm_of(label: str) -> str:
m = _NORM_RE.search(label or "")
return m.group(1).strip() if m else "Art. 13/14 DSGVO"
def _compile(patterns: list[str]) -> tuple[Pattern[str], ...]:
out: list[Pattern[str]] = []
for p in patterns or ():
try:
out.append(re.compile(p, re.IGNORECASE | re.MULTILINE))
except re.error:
continue
return tuple(out)
def _build_mcs() -> tuple[MC, ...]:
"""Hebt die ART13_CHECKLIST in das MC-Format (Boost-Pattern pro Feld)."""
mcs: list[MC] = []
for i, c in enumerate(ART13_CHECKLIST, start=1):
mcs.append(MC(
mc_id=f"DSE-MC-{i:03d}",
field_id=c["id"],
label=c["label"],
norm=_norm_of(c["label"]),
patterns=_compile(c.get("patterns", [])),
severity_if_missing=c.get("severity", "MEDIUM"),
level=c.get("level", 1),
))
return tuple(mcs)
MCS: tuple[MC, ...] = _build_mcs()
# Public list of all MC-IDs for the Registry / owned_mc_ids.
MC_IDS: tuple[str, ...] = tuple(m.mc_id for m in MCS)
def scope_matches(mc: MC, scope: set[str]) -> bool:
"""Art-13/14-Pflichten gelten universell für jede DSE — keine Branchen-
Gating auf Boost-Ebene (anders als Impressum mit Kammerberufen). Das
Sektor-Gate über den control_id-Prefix passiert in der v3-Engine."""
return True
@@ -0,0 +1,179 @@
"""Layer-0 Regex-Boost für den DSE-Agent — die kuratierten Art-13/14-Patterns
als deterministische Vor-Stufe vor dem Keyword-Match aus doc_check_controls.
Analog zu impressum/regex_boost.py + cookie_policy/regex_boost.py:
- run_v3_pipeline lädt die 267 text-MCs (doc_type='dse') und macht
Keyword-Match aus deren pass_criteria.
- MEIN Beitrag (Layer 0): die präzisen Art-13-Patterns (mcs.py / aus
ART13_CHECKLIST) laufen ZUERST. Trifft ein Pattern → das thematisch
passende DB-MC wird zu PASS geboostet (auch wenn der Keyword-Match unklar
war). Mapping: field_id → typische Wörter in der pass_criteria der DB-MC.
Damit gehen die kuratierten DSE-Patterns nicht verloren, sondern boosten das
DB-Control-System (statt es zu ersetzen).
"""
from __future__ import annotations
import logging
from .mcs import MCS, scope_matches
logger = logging.getLogger(__name__)
# field_id (aus ART13_CHECKLIST) → Wörter, wie sie in der pass_criteria der
# zugehörigen DSE-DB-MCs vorkommen. Treffen ≥2 dieser Wörter in den criteria
# eines DB-MC, gehört es zu diesem Feld → Boost. Die Vokabeln sind an den
# real beobachteten DB-Kriterien ausgerichtet (DATA/SEC/AUTH-Controls).
BOOST_KEYWORDS: dict[str, tuple[str, ...]] = {
"controller": (
"verantwortlich", "verantwortliche stelle", "verantwortlichen",
"kontaktdaten des verantwortlichen", "name und kontaktdaten",
"firmenname", "rechtsform", "anschrift", "ladungsfähige",
"identität des verantwortlichen", "controller",
),
"dpo": (
"datenschutzbeauftragt", "datenschutzbeauftragter",
"datenschutzbeauftragte", "data protection officer",
"kontaktdaten des datenschutz", "benennung", "art. 37",
),
"purposes": (
"zweck", "zwecke", "verarbeitungszweck", "zweck der verarbeitung",
"zwecke der verarbeitung", "verarbeitungstätigkeit", "purpose",
"zweckbindung", "erhebung",
),
"legal_basis": (
"rechtsgrundlage", "berechtigte interesse", "berechtigtes interesse",
"einwilligung", "vertragserfüllung", "vertragserfuellung",
"interessenabwägung", "interessenabwaegung", "art. 6",
"rechtmäßigkeit", "rechtmaessigkeit", "erforderlich",
),
"recipients": (
"empfänger", "empfaenger", "empfängerkategorien",
"empfaengerkategorien", "weitergabe", "übermittlung an",
"auftragsverarbeit", "auftragsverarbeiter", "dienstleister",
"dritte", "drittempfänger", "kategorien von empfängern",
),
"third_country": (
"drittland", "drittstaat", "drittländer", "drittlaender",
"standardvertragsklausel", "angemessenheitsbeschluss",
"übermittlung in ein drittland", "geeignete garantien",
"schutzgarantien", "data privacy framework", "ewr",
"internationale übermittlung", "transfermechanismus",
),
"third_country_mechanism": (
"standardvertragsklausel", "angemessenheitsbeschluss",
"geeignete garantien", "data privacy framework",
"schutzgarantien", "art. 46", "art. 45",
),
"retention": (
"speicherdauer", "aufbewahrungsfrist", "aufbewahrungsdauer",
"löschfrist", "loeschfrist", "speicherbegrenzung", "löschung",
"loeschung", "dauer der speicherung", "kriterien für die festlegung",
"speicherfrist", "aufbewahrung",
),
"retention_periods": (
"aufbewahrungsfrist", "löschfrist", "speicherdauer",
"gesetzliche frist", "handelsrechtlich", "steuerrechtlich",
),
"rights": (
"betroffenenrecht", "betroffenenrechte", "recht auf auskunft",
"auskunft", "berichtigung", "löschung", "loeschung",
"einschränkung", "einschraenkung", "datenübertragbarkeit",
"datenuebertragbarkeit", "widerspruch", "widerruf",
"rechte der betroffenen", "art. 15", "art. 17", "art. 21",
),
"complaint": (
"beschwerderecht", "aufsichtsbehörde", "aufsichtsbehoerde",
"datenschutzbehörde", "datenschutzbehoerde", "beschwerde",
"recht auf beschwerde", "art. 77", "zuständige aufsichtsbehörde",
),
"rights_art22_profiling": (
"automatisierte entscheidung", "profiling", "art. 22",
"automatisierte einzelentscheidung", "scoring",
),
"dse_version_date": (
"stand", "letzte aktualisierung", "zuletzt geändert",
"gültig ab", "gueltig ab", "version", "versionsdatum",
"aktualität", "nachweisbarkeit",
),
}
def compute_regex_boosts(text: str, business_scope: set[str] | None = None) -> set[str]:
"""Welche DSE-field_ids haben die kuratierten Patterns erkannt?
Returns die Menge gehit'ter field_ids, über die später entschieden wird,
ob ein DB-MC darüber automatisch passed werden kann. business_scope wird
akzeptiert (Signatur-Parität mit impressum), für DSE aber nicht gegated —
Art-13-Pflichten sind universell.
"""
if not text or len(text) < 50:
return set()
scope = business_scope or set()
hits: set[str] = set()
for mc in MCS:
if not scope_matches(mc, scope):
continue
if any(p.search(text) for p in mc.patterns):
hits.add(mc.field_id)
return hits
def boost_matches_db_mc(
boosts: set[str],
pass_criteria: list,
fail_criteria: list | None = None,
) -> str | None:
"""Hat ein gebooster field_id ≥2 Keyword-Überlapp mit den pass/fail_criteria
eines DB-MC? Returns field_id (höchster Match-Count) oder None."""
if not boosts:
return None
crit_parts: list[str] = []
for c in (pass_criteria or []):
if c:
crit_parts.append(str(c).lower())
for c in (fail_criteria or []):
if c:
crit_parts.append(str(c).lower())
if not crit_parts:
return None
crit_text = " ".join(crit_parts)
best: tuple[int, str] | None = None
for field_id in boosts:
kws = BOOST_KEYWORDS.get(field_id) or ()
match_count = sum(1 for kw in kws if kw in crit_text)
if match_count >= 2:
if best is None or match_count > best[0]:
best = (match_count, field_id)
return best[1] if best else None
def criteria_on_topic(
pass_criteria: list | None,
fail_criteria: list | None = None,
min_hits: int = 2,
) -> bool:
"""Deterministischer Themen-Gate: gehört ein DB-MC überhaupt ins DSE-
Themenfeld (Art 13/14)? ≥min_hits unterschiedliche Schlüsselwörter aus
IRGENDEINEM DSE-Feld in den kombinierten criteria. Fängt fremd-getaggte
MCs ab. Leere Kriterien → on-topic behalten (konservativ)."""
crit_parts: list[str] = []
for c in (pass_criteria or []):
if c:
crit_parts.append(str(c).lower())
for c in (fail_criteria or []):
if c:
crit_parts.append(str(c).lower())
if not crit_parts:
return True
crit_text = " ".join(crit_parts)
hits: set[str] = set()
for kws in BOOST_KEYWORDS.values():
for kw in kws:
if kw in crit_text:
hits.add(kw)
if len(hits) >= min_hits:
return True
return False
@@ -0,0 +1,209 @@
"""v3-Engine: läuft die 4-Layer-Pipeline auf einem DSE-Text (Art. 13/14 DSGVO).
Layer 0 — Regex-Boost (die kuratierten Art-13-Patterns aus mcs.py)
Layer 1 — MC-Laden + Keyword-Match. Das LADEN delegiert an die Main-Tool-
Engine (rag_document_checker._load_controls, doc_type='dse'):
eine Quelle der Wahrheit inkl. P72-Scope, check_type='text'
(267 von 571) und fits_doc_type/scope_requires aus dem Sidecar.
Layer 2 — BGE-M3 Embedding-Match (mc_embedding_matcher, shared)
Layer 0 Override — failed MCs, deren criteria zu einem gebooster field_id
passen, werden zu PASS überschrieben.
Zusätzlich am Agent-Rand: subtraktives Sektor-/Themen-Gate (_filter_controls)
— das Sektor-Gate (Branchen-Prefix GOV/FIN/MED…) verwirft branchenfremde MCs,
das Themen-Gate fremd-getaggte. Analog impressum/v3_engine.py.
Output: Liste Result-Dicts kompatibel mit rag_document_checker. Der Agent
konvertiert sie zu Finding-Objekten.
"""
from __future__ import annotations
import logging
from typing import Any
from .regex_boost import (
compute_regex_boosts,
criteria_on_topic,
)
logger = logging.getLogger(__name__)
# Branchen-Prefix -> erwarteter Scope-Token. Reuse aus dem Mail-V2-Scope-
# Filter, damit Agent-Pfad und Report-Pfad dieselbe Quelle nutzen. Import
# defensiv: faellt der Mail-Pfad weg, bleibt der Agent lauffaehig.
try:
from compliance.services.mail_render_v2._scope_filter import (
SECTOR_PREFIXES,
)
except Exception: # pragma: no cover - defensiver Fallback
SECTOR_PREFIXES = {}
async def run_v3_pipeline(
text: str,
business_scope: set[str],
db_url: str = "",
skip_embedding: bool = False,
) -> tuple[list[dict[str, Any]], dict[str, Any]]:
"""Returns (results, telemetry).
results: pro DB-MC ein dict {control_id, passed, severity, ...}
telemetry: counters für Frontend-Anzeige (Layer-Aufschlüsselung)
skip_embedding: Layer-2 (BGE-M3 Recall) überspringen. Nur für Unit-Tests
ohne Embedding-Service. Im Betrieb läuft die Recall-Schicht: sie ist
gecacht (pro Doc-Hash) und Reachability-gegated, blockiert also nie.
"""
if not text or len(text) < 100:
return [], {"reason": "text too short"}
# Layer 0: kuratierte Art-13-Patterns
boosts = compute_regex_boosts(text, business_scope)
boost_field_ids = sorted(boosts)
logger.info("dse v3 Layer-0 boosts: %d hits — %s",
len(boost_field_ids), boost_field_ids)
# Layer 1: MC-Laden DELEGIERT an die Main-Tool-Engine (Scope-Schutz inkl.).
try:
from compliance.services.rag_document_checker import _load_controls
controls = await _load_controls(
"dse", db_url, 0, business_scope,
)
except Exception as e:
logger.warning("dse v3 load via main-tool engine failed: %s", e)
controls = []
_normalize_criteria(controls)
# Agent-Rand-Backstop: Sektor-Gate (Branchen-Prefix) + Themen-Gate.
controls, drop_stats = _filter_controls(controls, business_scope)
# Applicability-Gate: hochsichere organisatorische Controls (laut
# control_classification NICHT DSE, needs_review=false) aus dem
# FINDINGS-Scan nehmen -> organisatorische Checkliste statt False-FEHLT.
# Fail-safe: needs_review bleiben drin. Defensiv: fehlt die Tabelle, kein
# Filter (Prod-sicher). Siehe _classification_gate.
from ._classification_gate import apply_gate, load_dse_gate
gate = await load_dse_gate(db_url)
organizational: list[dict[str, Any]] = []
if gate:
controls, organizational = apply_gate(controls, gate)
results: list[dict[str, Any]] = []
if controls:
try:
from compliance.services.rag_document_checker import (
_check_mc_deterministic,
)
text_lower = text.lower().replace("\xad", "")
for mc in controls:
r = _check_mc_deterministic(text_lower, mc)
if r:
r["_pass_criteria"] = mc.get("pass_criteria")
r["_fail_criteria"] = mc.get("fail_criteria")
results.append(r)
except Exception as e:
logger.warning("layer-1 keyword check failed: %s", e)
results = []
layer_1_pass = sum(1 for r in results if r.get("passed"))
# Layer 2: DETERMINISTISCHE semantische Recall-Schicht (BGE-M3, gecacht).
# Ersetzt den früheren Regex-Boost, der auf vollständigen DSE-Dokumenten
# massiv über-passte (BMW: 71/94 Boost-Overrides → 49% Übereinstimmung).
# Embedding ist genauer (BMW-GT: KW|EMB@0.65 = 75%) UND deterministisch
# (feste Funktion, reproduzierbar — kein Keyword-Katalog, kein LLM).
embedding_passes = 0
if not skip_embedding:
failed_cids = [r.get("control_id") for r in results
if r and not r.get("passed") and r.get("control_id")]
if failed_cids:
try:
from ._embedding_recall import embedding_recall
semantic = await embedding_recall(text, failed_cids)
except Exception as e:
logger.warning("dse embedding recall failed: %s", e)
semantic = set()
for r in results:
if (r.get("control_id") in semantic
and not r.get("passed")):
r["passed"] = True
r["matched_text"] = "[semantisch erkannt — Embedding]"
r["source"] = (r.get("source") or "") + "+embedding"
embedding_passes += 1
telemetry = {
"layer_0_field_hits": len(boost_field_ids),
"layer_0_field_ids": boost_field_ids,
"layer_1_pass": layer_1_pass,
"embedding_passes": embedding_passes,
"total_mcs": len(results),
"sector_dropped": drop_stats.get("sector_dropped", 0),
"offtopic_dropped": drop_stats.get("offtopic_dropped", 0),
"gate_excluded": len(organizational),
"organizational_checklist": organizational,
}
logger.info("dse v3 telemetry: %s", telemetry)
return results, telemetry
def _filter_controls(
controls: list[dict[str, Any]],
business_scope: set[str],
) -> tuple[list[dict[str, Any]], dict[str, int]]:
"""Subtraktiver Scope-Filter VOR der Bewertung.
1. Sektor-Gate — MCs deren control_id-Prefix eine Branche bezeichnet
(FIN/GOV/MED/INS/EDU/LEG/REL/POL), die NICHT im business_scope liegt
UND die nicht on-topic ist, werden verworfen.
2. Themen-Gate — MCs ohne DSE-Themenüberlapp werden verworfen.
Rein subtraktiv: entfernt nur falsch-positive Kandidaten.
"""
scope_lc = {s.lower() for s in (business_scope or set())}
kept: list[dict[str, Any]] = []
sector_dropped = 0
offtopic_dropped = 0
for c in controls:
cid = c.get("control_id") or ""
prefix = cid.split("-")[0].upper() if "-" in cid else ""
on_topic = criteria_on_topic(c.get("pass_criteria"),
c.get("fail_criteria"))
required = SECTOR_PREFIXES.get(prefix)
# Sektor-Gate nur fuer NICHT-on-topic Controls: ein klar DSE-
# thematischer Control (z.B. GOV-Prefix aus der Domain-Erkennung)
# darf nicht am Branchen-Prefix scheitern.
if required and not (scope_lc & required) and not on_topic:
sector_dropped += 1
continue
if not on_topic:
offtopic_dropped += 1
continue
kept.append(c)
if sector_dropped or offtopic_dropped:
logger.info(
"dse v3 scope-filter: -%d Branchen-MCs, -%d themenfremde MCs "
"(scope=%s)", sector_dropped, offtopic_dropped,
sorted(scope_lc) or "leer",
)
return kept, {
"sector_dropped": sector_dropped,
"offtopic_dropped": offtopic_dropped,
}
def _normalize_criteria(controls: list[dict[str, Any]]) -> None:
"""asyncpg liefert JSONB-Spalten (pass_criteria/fail_criteria) als
Roh-String. In echte Listen parsen, damit Sektor-/Themen-Gate und der
Boost-Layer Element-weise iterieren."""
import json
for c in controls:
for key in ("pass_criteria", "fail_criteria"):
v = c.get(key)
if isinstance(v, list):
continue
if isinstance(v, str):
try:
parsed = json.loads(v)
c[key] = parsed if isinstance(parsed, list) else [v]
except Exception:
c[key] = [v] if v else []
else:
c[key] = []
@@ -1,12 +1,27 @@
"""AGBAgent — kuratierte §§-305-ff-BGB-Checkliste (ChecklistAgent-Subclass)."""
from __future__ import annotations
"""AGBAgent (v2, routed). Embedding/LLM offline-gestubbt → kein Netzwerk."""
import asyncio
import pytest
import compliance.services.specialist_agents.agb._pipeline as pipeline
from compliance.services.checkers.base import CheckResult
from compliance.services.specialist_agents import REGISTRY, AgentInput
class _Stub:
def __init__(self, present):
self._p = present
async def check(self, ctrl, doc):
return CheckResult(present=self._p)
@pytest.fixture(autouse=True)
def _offline(monkeypatch):
monkeypatch.setattr(pipeline, "_EMB", _Stub(None))
monkeypatch.setattr(pipeline, "_LLM", _Stub(None))
def _run(text: str):
return asyncio.run(
REGISTRY.get("agb").evaluate(AgentInput(doc_type="agb", text=text)))
@@ -0,0 +1,62 @@
"""AGB routed-Pipeline: Gate, Reference-/Embedding-Rescue, LLM-skip, Re-Tiering.
Embedding + LLM offline-gestubbt → deterministisch, kein Netzwerk (Reference = echtes Regex)."""
import asyncio
from types import SimpleNamespace
import pytest
import compliance.services.specialist_agents.agb._pipeline as pipeline
from compliance.services.checkers.base import CheckResult
from compliance.services.specialist_agents._base import AgentInput
from compliance.services.specialist_agents.agb.agent import AGBAgent
class _Stub:
def __init__(self, present):
self._p = present
async def check(self, ctrl, doc):
return CheckResult(present=self._p)
@pytest.fixture(autouse=True)
def _offline(monkeypatch):
monkeypatch.setattr(pipeline, "_EMB", _Stub(None))
monkeypatch.setattr(pipeline, "_LLM", _Stub(None))
def _routed(field_ids, text, context=None):
findings = [SimpleNamespace(field_id=fid) for fid in field_ids]
return asyncio.run(pipeline.run_routed(findings, text, context or {}))
def test_gate_termination_na_for_oneoff_shop():
text = "Widerrufsbelehrung: Sie koennen binnen 14 Tagen widerrufen. " * 5
kept, resolved, gated = _routed(["termination", "termination_form"], text)
assert set(gated) == {"termination", "termination_form"}
assert kept == []
def test_reference_rescues_data_protection():
text = "Einzelheiten zur Verarbeitung in unserer Datenschutzerklaerung. " * 5
kept, resolved, gated = _routed(["data_protection"], text)
assert "data_protection" in resolved and kept == []
def test_embedding_rescue_resolves(monkeypatch):
monkeypatch.setattr(pipeline, "_EMB", _Stub(True))
kept, resolved, gated = _routed(["scope"], "x" * 200)
assert "scope" in resolved
def test_llm_skipped_keeps_finding():
kept, resolved, gated = _routed(["delivery_timeframe"], "x" * 200, {"skip_llm": True})
assert [f.field_id for f in kept] == ["delivery_timeframe"] and resolved == []
def test_evaluate_retiers_low_out_of_findings():
text = ("Allgemeine Geschaeftsbedingungen. Vertragsschluss durch Bestellung. "
"Haftung beschraenkt. Gerichtsstand Muenchen. ") * 6
out = asyncio.run(AGBAgent().evaluate(AgentInput(doc_type="agb", text=text)))
assert out.agent == "agb" and out.agent_version == "2.0"
assert all(f.severity in ("HIGH", "MEDIUM") for f in out.findings)
@@ -0,0 +1,14 @@
"""AGB muss im LIVE-Pfad verdrahtet sein (_TOPIC_AGENTS), nicht nur per Snapshot."""
from compliance.api.agent_check._agent_outputs import _TOPIC_AGENTS
def test_agb_wired_into_live_topic_agents():
assert _TOPIC_AGENTS.get("agb") == "agb"
def test_dse_wired_into_live_topic_agents():
assert _TOPIC_AGENTS.get("dse") == "dse"
def test_impressum_still_wired():
assert _TOPIC_AGENTS.get("impressum") == "impressum"
@@ -0,0 +1,83 @@
"""Unit-Tests der Prüfer-Library. Embedding + LLM gemockt → kein Netzwerk."""
import asyncio
import compliance.services.llm_cascade as cascade_mod
import compliance.services.mc_embedding_matcher as emb_mod
from compliance.services.checkers.base import (
ControlSpec,
DecisionMethod,
DocContext,
VerificationMethod,
)
from compliance.services.checkers.embedding_checker import EmbeddingChecker
from compliance.services.checkers.llm_checker import LLMChecker
from compliance.services.checkers.reference_checker import ReferenceChecker
def _run(coro):
return asyncio.run(coro)
def test_reference_present_and_absent():
rc = ReferenceChecker()
spec = ControlSpec("data_protection", VerificationMethod.REFERENCE,
DecisionMethod.LINK_RESOLVER,
patterns=[r"datenschutz(erkl|bestimmung|hinweis)"])
r = _run(rc.check(spec, DocContext(
text="Details in unserer Datenschutzerklaerung: https://x.de/datenschutz")))
assert r.present is True
assert r.detail.get("link", "").startswith("https://")
r2 = _run(rc.check(spec, DocContext(text="Keine Angabe zum Datenschutz-Thema.")))
assert r2.present is False
def test_embedding_threshold(monkeypatch):
monkeypatch.setattr(emb_mod, "DIM", 3, raising=False)
monkeypatch.setattr(emb_mod, "_chunk_text", lambda t: [t], raising=False)
async def _embed(texts):
return [[1.0, 0.0, 0.0] for _ in texts]
monkeypatch.setattr(emb_mod, "_embed_texts", _embed, raising=False)
ec = EmbeddingChecker()
spec = ControlSpec("scope_t", VerificationMethod.CONTENT, DecisionMethod.EMBEDDING,
paraphrases=["Geltungsbereich"], embed_threshold=0.58)
monkeypatch.setattr(emb_mod, "_cosine", lambda a, b: 0.90, raising=False)
r = _run(ec.check(spec, DocContext(text="x" * 200)))
assert r.present is True and r.confidence >= 0.58
monkeypatch.setattr(emb_mod, "_cosine", lambda a, b: 0.20, raising=False)
r2 = _run(ec.check(spec, DocContext(text="x" * 200)))
assert r2.present is False
def test_embedding_offline_returns_none(monkeypatch):
async def _boom(texts):
raise ConnectionError("embedding-service down")
monkeypatch.setattr(emb_mod, "_embed_texts", _boom, raising=False)
ec = EmbeddingChecker()
spec = ControlSpec("scope_off", VerificationMethod.CONTENT, DecisionMethod.EMBEDDING,
paraphrases=["x"], embed_threshold=0.6)
r = _run(ec.check(spec, DocContext(text="y" * 200)))
assert r.present is None # fail-safe
def test_llm_present_and_absent(monkeypatch):
lc = LLMChecker()
spec = ControlSpec("delivery_timeframe", VerificationMethod.CONTENT, DecisionMethod.LLM,
topic_regex=r"liefer", question="Konkrete Lieferfrist?")
doc = DocContext(text=("1. Lieferung\nDie Ware wird innerhalb von 2 Werktagen "
"geliefert.\n") * 4)
async def _erfuellt(system, user, **kw):
return {"text": '{"verdict":"ERFUELLT","zitat":"2 Werktagen","begruendung":"x"}',
"source": "qwen", "confidence": 0.7}
monkeypatch.setattr(cascade_mod, "call_with_cascade", _erfuellt, raising=False)
assert _run(lc.check(spec, doc)).present is True
async def _fehlt(system, user, **kw):
return {"text": '{"verdict":"FEHLT"}', "source": "qwen"}
monkeypatch.setattr(cascade_mod, "call_with_cascade", _fehlt, raising=False)
assert _run(lc.check(spec, doc)).present is False
@@ -1,65 +1,96 @@
"""DSEAgent — kuratierte Art-13/14-Checkliste (kein Library-Firehose)."""
from __future__ import annotations
"""DSEAgent v3 (4-Layer: Regex-Boost / Keyword / BGE-M3-Recall / Semantic).
DB (_load_controls), Embedding-Service und LLM sind offline gestubbt → die Tests
sind deterministisch und brauchen kein Netzwerk. Die reinen Schichten
(compute_regex_boosts, embedding_recall-Reachability) werden direkt geprüft, die
Result→Finding-Konvertierung über einen gestubbten run_v3_pipeline.
"""
import asyncio
import compliance.services.specialist_agents.dse.agent as dse_agent
from compliance.services.specialist_agents import REGISTRY, AgentInput
from compliance.services.specialist_agents.dse._embedding_recall import (
embedding_recall,
)
from compliance.services.specialist_agents.dse.regex_boost import (
compute_regex_boosts,
)
_TELEMETRY = {
"layer_0_field_hits": 0, "layer_0_field_ids": [],
"layer_1_pass": 0, "embedding_passes": 0, "total_mcs": 1,
"sector_dropped": 0, "offtopic_dropped": 0,
"gate_excluded": 0, "organizational_checklist": [],
}
def _run(text: str):
return asyncio.run(
REGISTRY.get("dse").evaluate(AgentInput(doc_type="dse", text=text)))
def _pipeline_stub(results):
async def _stub(text, scope):
return results, dict(_TELEMETRY, total_mcs=len(results))
return _stub
def test_dse_agent_registered():
assert REGISTRY.get("dse") is not None
def _evaluate(text, context=None):
return asyncio.run(dse_agent.DSEAgent().evaluate(
AgentInput(doc_type="dse", text=text, context=context or {})))
def test_dse_detects_core_obligations():
text = (
"Datenschutzerklaerung. Verantwortlich im Sinne der DSGVO ist die "
"Muster GmbH, Musterstrasse 1, 12345 Berlin. E-Mail: info@muster.de. "
"Datenschutzbeauftragter: dsb@muster.de. Zwecke der Verarbeitung und "
"Rechtsgrundlage Art. 6 Abs. 1. Empfaenger Ihrer Daten. Speicherdauer "
"der Daten. Ihre Rechte: Auskunft, Loeschung, Widerspruch, Beschwerde "
"bei der Aufsichtsbehoerde. ") * 3
out = _run(text)
assert out.agent == "dse"
# 10 L1-Pflichtangaben immer + L2-Details deren Parent vorhanden ist
# (fehlende Parents → L2 übersprungen, kein 'na'-Rauschen).
assert 10 <= out.mc_total <= 33
ok = [c.label for c in out.mc_coverage if c.status == "ok"]
assert any("Verantwortlich" in lbl for lbl in ok)
assert any("Rechtsgrundlage" in lbl for lbl in ok)
def test_dse_missing_obligations_are_findings():
out = _run("Lorem ipsum dolor sit amet consectetur adipiscing elit. " * 6)
assert out.findings
assert any(f.severity == "HIGH" for f in out.findings)
def test_dse_agent_registered_is_v3():
agent = REGISTRY.get("dse")
assert agent is not None and agent.agent_version == "3.0"
def test_dse_short_text_skips():
out = _run("zu kurz")
out = _evaluate("zu kurz")
assert out.confidence == 0.0
assert all(c.status == "skipped" for c in out.mc_coverage)
def test_third_country_high_when_applicable_no_na_detail_short_action():
# Text ohne Drittland-Abschnitt + Scan-Kontext drittland=ja:
# - third_country (L1) fehlt → HIGH (nicht weiches MEDIUM)
# - Transfermechanismus (L2) → KEIN 'na' (übersprungen, Parent deckt ab)
# - Titel/Maßnahme kurz (kein 280-Zeichen-Hint als Recommendation-Titel)
text = ("Datenschutz. Verantwortlich ist die Muster GmbH, info@muster.de. "
"Zwecke und Rechtsgrundlage Art. 6. Speicherdauer. Ihre Rechte. ") * 4
out = asyncio.run(REGISTRY.get("dse").evaluate(AgentInput(
doc_type="dse", text=text,
context={"scan_context": {"third_country_transfer": "yes"}})))
tc = [f for f in out.findings if "Drittland" in f.title]
assert tc and tc[0].severity == "HIGH"
assert not any(c.status == "na" and "Transfermechanismus" in c.label
for c in out.mc_coverage)
assert all(len(f.action) < 110 for f in out.findings)
# Detail-Begründung bleibt als evidence erhalten
assert any(f.evidence for f in out.findings)
def test_regex_boost_detects_core_fields():
text = ("Verantwortlicher im Sinne der DSGVO ist die Muster GmbH. "
"Rechtsgrundlage ist Art. 6. Speicherdauer der Daten. Beschwerde "
"bei der Aufsichtsbehoerde. ") * 2
hits = compute_regex_boosts(text, set())
assert {"controller", "legal_basis", "retention", "complaint"} & hits
def test_embedding_recall_offline_returns_empty():
# Kein Embedding-Service (Unit) -> Reachability-Guard -> leeres Set, kein Hang.
got = asyncio.run(embedding_recall("x" * 200, ["DSE-X-1"]))
assert got == set()
def test_evaluate_builds_finding_from_failed_db_mc(monkeypatch):
monkeypatch.setattr(dse_agent, "run_v3_pipeline", _pipeline_stub([{
"control_id": "DATA-RETENTION-1", "passed": False, "severity": "MEDIUM",
"label": "Speicherdauer der Daten", "regulation": "DSGVO", "article": "13",
"source": "keyword_match",
}]))
out = _evaluate("Datenschutzerklaerung " + "x" * 200, context={"skip_llm": True})
f = next((f for f in out.findings if f.field_id == "DATA-RETENTION-1"), None)
assert f is not None and f.severity == "MEDIUM"
assert f.action and len(f.action) <= 400
def test_evaluate_passed_db_mc_no_finding(monkeypatch):
monkeypatch.setattr(dse_agent, "run_v3_pipeline", _pipeline_stub([{
"control_id": "PURPOSE-1", "passed": True, "severity": "MEDIUM",
"label": "Zwecke", "matched_text": "Zwecke der Verarbeitung",
}]))
out = _evaluate("Datenschutzerklaerung " + "x" * 200, context={"skip_llm": True})
assert "PURPOSE-1" not in [f.field_id for f in out.findings]
assert any(c.mc_id == "PURPOSE-1" and c.status == "ok" for c in out.mc_coverage)
def test_evaluate_third_country_high_on_documented_transfer(monkeypatch):
monkeypatch.setattr(dse_agent, "run_v3_pipeline", _pipeline_stub([{
"control_id": "TRANSFER-1", "passed": False, "severity": "MEDIUM",
"label": "Drittlanduebermittlung", "regulation": "DSGVO", "article": "13",
}]))
out = _evaluate(
"Datenschutzerklaerung " + "x" * 200,
context={"skip_llm": True,
"scan_context": {"third_country_transfer": "yes"}})
f = next((f for f in out.findings if f.field_id == "TRANSFER-1"), None)
assert f is not None and f.severity == "HIGH"
assert f.severity_reason == "db_mc_failed_third_country_transfer"
@@ -0,0 +1,43 @@
# Mapping: Nutzungsbedingungen & Shop-AGB auf die Prüfer-Matrix
> **Zweck:** Beleg der These *„neues Modul = Klassifizierung/Mapping, kein Forschungsprojekt"*. Keine neue Architektur, keine neuen Prüfertypen — nur Zuordnung vorhandener Controls + weniger neuer Items auf die bestehenden Prüfer.
## 1. Shop-AGB = 0 neue Arbeit
Der AGB-Korpus, an dem kalibriert wurde (Zalando, Otto, MediaMarkt, Tchibo, Lieferando), **sind** Shop-AGB. „Shop-AGB" ist damit kein neues Modul, sondern **das AGB-Modul selbst**. Aufwand: **null**.
## 2. Nutzungsbedingungen (Plattform/App-ToS) = Reuse + wenige neue Items
NB sind Vertragsprosa wie AGB, nur ohne Warenkauf-Pflichten und mit Plattform-spezifischen Pflichten. Mapping:
**Aus AGB wiederverwendet (gleiche Prüfer, ggf. neue Paraphrasen):**
| Item | verification_method | decision_method |
|---|---|---|
| scope (Geltungsbereich) | CONTENT | EMBEDDING |
| liability (Haftung) | CONTENT | EMBEDDING |
| jurisdiction / choice_of_law | CONTENT | EMBEDDING |
| data_protection (DSE-Verweis) | REFERENCE | LINK_RESOLVER |
| salvatory_clause | CONTENT | EMBEDDING |
| amendment_clause | CONTENT | EMBEDDING |
| termination (Konto/Account) | CONTENT | EMBEDDING |
| consumer_rights | CONTENT | EMBEDDING |
| dispute_odr_link (bei B2C) | REFERENCE | LINK_RESOLVER |
**Nicht anwendbar (Scope-Gate, Waren-Verkauf):** payment*, delivery*, warranty*, contract/incorporation (anderer Vertragsschluss).
**Neu (NB-spezifisch) — alle auf EXISTIERENDE Prüfertypen:**
| Item | verification_method | decision_method |
|---|---|---|
| Nutzungsrechte / zulässige Nutzung | CONTENT | EMBEDDING |
| Geistiges Eigentum / Schutzrechte | CONTENT | EMBEDDING |
| Verfügbarkeit (kein Anspruch auf ununterbrochenen Betrieb) | CONTENT | EMBEDDING |
| Account / Registrierung / Nutzerpflichten | CONTENT | EMBEDDING |
| Nutzergenerierte Inhalte / Verhaltensregeln | CONTENT | EMBEDDING |
| Haftung für Links / Drittinhalte | CONTENT | EMBEDDING |
## 3. Ergebnis
- **Shop-AGB:** 0 neue Items, 0 neue Prüfer.
- **Nutzungsbedingungen:** ~9 Items aus AGB wiederverwendet + ~6 neue Items — **alle auf bestehenden Prüfertypen** (CONTENT/EMBEDDING + REFERENCE). **0 neue Prüfertypen.**
Ein neues Web-Dokument ist damit ein **Mapping-/Klassifizierungs-** und Paraphrasen-Schreibproblem (StundenTage), kein Mess-/Forschungsprojekt (Wochen). Genau die These der Prüfer-Matrix.
@@ -0,0 +1,87 @@
# Prüfer-Matrix — Meta-Modell der Doc-Check-Plattform
> **Status:** Plattformkonzept, **eingefroren 2026-06-21**. Abgeleitet aus 4 kalibrierten Modulen (DSE, Cookie, Impressum, AGB). Erweitert `verification_method.md` (5→8 Klassen) und fügt die `decision_method`-Achse hinzu.
> **Kernsatz:** *Nicht jedes Control braucht denselben Richter.* Der **Kontrolltyp bestimmt den Prüfer** — nicht alles ist ein Text-/LLM-Problem.
## 0. Die Architektur-Verschiebung
**Vorher (implizit):** `Control → Embedding → LLM → Finding`.
**Jetzt (empirisch bewiesen):**
```
Control → [scope-gate] → artifact_type → verification_method → decision_method
→ passender Prüfer → Evidence → Finding (severity-getiert)
```
Vier strukturell verschiedene Dokumenttypen führten immer wieder auf dieselbe Meta-Struktur. Das ist größer als jeder Einzel-Fix: es ist mit hoher Wahrscheinlichkeit das Routing-Prinzip für alle ~14.000 Master Controls.
## 1. Empirische Basis (4 Module)
| Modul | dominanter Prüfer | Beleg |
|---|---|---|
| DSE | CONTENT (LLM/Embedding) | Kriterien-Kalibrierung, FP 11→6 % |
| Cookie-Banner | BEHAVIOR | Enforcement / Dark-Pattern (Playwright) |
| Cookie-Policy | CONTENT + REFERENCE | Inhalt + Verweise |
| Impressum | FIELD + PRESENTATION (+ SCOPE-Gate) | Feld-Matcher FP 0 %, Präsentation re-routed |
| AGB | CONTENT (KEYWORD→EMBEDDING→LLM) + REFERENCE (+ SCOPE-Gate) | 71 % FP → ~0; LLM nur 2/21 Items |
## 2. Achse 1 — `verification_method` (welcher Prüfer-TYP)
| verification_method | Prüfer | Leitfrage | Beleg | Reifegrad |
|---|---|---|---|---|
| **CONTENT** | Embedding + LLM-Kaskade | Was steht (als Offenlegung) im Text? | DSE, Cookie-Policy | kalibriert |
| **FIELD** | Regex / Extraktion (Feldmatrix) | Welche Pflichtfelder existieren + sind valide? | Impressum (HRB, USt-IdNr, Anschrift) | ✓ FP 0 % |
| **REFERENCE** | Link-Resolver | Gibt es einen klaren Verweis/Link, löst er auf? | AGB `data_protection` | ✓ 7/7 |
| **BEHAVIOR** | Playwright + API | Manipuliert die UI die Entscheidung? | Cookie-Banner (Reject=Accept, Pre-Consent-Cookies) | Matrix vorhanden |
| **PRESENTATION** | Playwright UI-Sensor | Auffindbar / sichtbar / erreichbar? | Impressum „leicht erkennbar" | re-routed |
| **PROCESS** | Audit / Evidence | Gibt es einen internen Nachweis? | VVT, TOM, interne Richtlinie | Checkliste |
| **TECHNICAL** | Scanner (Repo / Netz / Config) | Ist die technische Maßnahme implementiert? | geplant: CRA, NIS2, ISO 27001 | offen |
| **CONTRACTUAL** | Clause-Engine | Ist die Klausel vorhanden + rechtskonform? | AGB (delivery/warranty; Defekte → Stage 3) | teilweise |
**CONTENT vs CONTRACTUAL:** CONTENT = Offenlegungs-Prosa (DSE nennt Zwecke). CONTRACTUAL = Vertragsklauseln (AGB-Haftung/Lieferung). Beide können Embedding+LLM nutzen — die Trennung ist die Rechtsnatur + die spätere Defekt-Prüfung (Klausel rechtswidrig?).
**PRESENTATION ≠ BEHAVIOR:** beide Playwright, andere Rechtslogik. PRESENTATION = Auffindbarkeit/Sichtbarkeit; BEHAVIOR = Entscheidungs-Manipulation/Dark-Pattern.
## 3. Achse 2 — `decision_method` (WIE innerhalb CONTENT/CONTRACTUAL entschieden wird)
Die AGB-Entdeckung: **Controls INNERHALB eines Prüfer-Typs brauchen verschiedene Entscheider.** Eskalation nur bei Bedarf (Kostendisziplin):
| decision_method | Mechanismus | Wann | Beleg (AGB) |
|---|---|---|---|
| **KEYWORD** | Regex-Match | Pflicht eindeutig formuliert | Keyword-Layer |
| **EMBEDDING** | per-Item-Cosinus-Schwelle (Doc-Chunks × Item-Paraphrasen) | Prosa, semantisch trennbar | 13/21 Items, 0 Fehl-Rescue |
| **LLM** | Clause-Retrieval (**ganze §-Abschnitte**) + starkes Modell, present/absent | semantisch eng (Embedding trennt nicht) | 2/21 Items (delivery/warranty), 14/14 |
`CONTENT_SIMPLE` = KEYWORD/EMBEDDING reicht; `CONTENT_COMPLEX` = LLM nötig. AGB-Bilanz: **81 % deterministisch, 19 % LLM-fähig**, LLM real nur bei Keyword-Miss.
## 4. Durable Per-Control-Metadaten (das Routing-Vokabular)
| Feld | Zweck |
|---|---|
| `artifact_type` | gegen welches Artefakt geprüft wird → Scanner-Routing |
| `obligation_type` | Rechtsnatur: Pflicht / Empfehlung / Kann → Tier |
| `check_intent` | was die Prüfung bezweckt |
| `reference_allowed` | darf per Verweis erfüllt werden → REFERENCE statt CONTENT |
| `scope` / `scope_requires` | Applicability-Gate (Geschäftsmodell, Rechtsform) — **vor** allen Prüfern |
| `verification_method` | Achse 1 (Prüfer-Typ) |
| `decision_method` | Achse 2 (Entscheider innerhalb CONTENT/CONTRACTUAL) |
| `severity` | HIGH / MEDIUM / LOW → Finding vs Empfehlung |
## 5. Hart erarbeitete Plattform-Prinzipien
1. **Route, don't uniformly-LLM** — verschiedene Controls, verschiedene Prüfer.
2. Eskaliere **KEYWORD → EMBEDDING → LLM nur bei Bedarf** (AGB: 17/21 ohne LLM).
3. Embedding: **per-Item-Schwellen** (globale Schwelle scheitert bei juristischer Prosa — PASS/FAIL überlappen global, trennen per-Item).
4. LLM-Judge: **ganze §-Abschnitte** schlagen Top-k-Chunks; **starken Tier pinnen** (billig-zuerst-Kaskade eskaliert selbstbewusst-falsche Antworten NICHT, weil die Confidence-Heuristik genauigkeits-blind ist); **present/absent** trennen von der Defekt-Prüfung.
5. **REFERENCE (Link) ist ein eigener billiger Prüfer** — keinen „siehe Datenschutzerklärung"-Verweis durch ein LLM jagen.
6. **SCOPE-Gate (Applicability) ist vor allen Prüfern** — N/A-Controls werden nie geprüft.
7. **Severity → Finding vs Empfehlung** (Tier, nicht droppen).
8. *Was im Text nicht beweisbar ist, gehört nicht in den Text-Check.*
## 6. Schema-Status
Kein DB-Eingriff (DB eingefroren). `verification_method` + `decision_method` als **abgeleitete Tags** in `control_classification` (aus `artifact_type` / `obligation_type` / `check_intent` + Item-Kalibrierung). `canonical_controls.verification_method` existiert (~4 % befüllt, gröbere Enterprise-Taxonomie) — **nicht** das Doc-Check-Routing.
## 7. Verbindlichkeit
Dies ist der **Vertrag**, gegen den implementiert wird. Die AGB-Integration und die nächsten Module (Nutzungsbedingungen, Widerruf, CRA, MaschVO, DORA, NIS2, ISO 27001, AI-Act, VVT, TOM) bauen **dieselbe** Routing-Schicht — nicht modul-lokal. Reihenfolge: **(1) diese Matrix einfrieren → (2) AGB integrieren → (3) Nutzungsbedingungen → (4) Widerruf.**
@@ -0,0 +1,64 @@
# BreakPilot — Evidenz- & Qualitätsnachweis (Website-Compliance v1)
> **Status:** konsolidierter Freeze-Stand 2026-06-21. Belegbasis aus 4 kalibrierten Modulen (DSE, Cookie, Impressum, AGB). Dient als (a) technischer Freeze-Record und (b) Backbone für Sales/Investoren.
> **Hinweis:** Zahlen = *gemessene* Validierungsergebnisse gegen Opus-Ground-Truth. Tool-/Prod-Integrationsstand je Modul siehe §7 (validiert ≠ überall schon live).
## 1. Kernaussage
Die meisten Compliance-Tools machen: **Dokument → LLM → Finding** — ein Richter für alles. Das erzeugt systematische False Positives und hat *keine* belastbare Evidenzbasis.
BreakPilot macht: **Dokument → Control-Routing → spezialisierter Prüfer → Finding.**
> Wir haben **für jeden Kontrolltyp den optimalen Prüfer empirisch ermittelt** — mit echten Vorher/Nachher-Zahlen, nicht mit Marketing.
Das ist über 4 strukturell verschiedene Dokumenttypen reproduzierbar belegt — und damit voraussichtlich das Routing-Prinzip für alle ~14.000 Master Controls.
## 2. Die Architektur (zwei Routing-Achsen)
Vollständige Kette: **Regulation → Obligation → Control → verification_method → decision_method → Prüfer → Evidence → Finding → Ticket.**
- **`verification_method`** (Kategorie / welcher Prüfer-Typ): CONTENT · FIELD · REFERENCE · BEHAVIOR · PRESENTATION · PROCESS · TECHNICAL · CONTRACTUAL.
- **`decision_method`** (konkreter Mechanismus): REGEX · EMBEDDING · LLM · LINK_RESOLVER · PLAYWRIGHT · AUDIT · SCANNER.
Kernregel: *Was im Text nicht beweisbar ist, gehört nicht in den Text-Check.* Scope-Gate (Applicability) läuft vor allen Prüfern; Severity steuert Finding vs. Empfehlung.
## 3. Evidenz je Modul
| Modul | dominanter Prüfer | gemessenes Ergebnis | Hebel | Reife |
|---|---|---|---|---|
| **DSE** | CONTENT (Embedding+LLM) | False Positives **11 % → 6 %**; an **8 Firmen** validiert, Generalisierung nachgewiesen (kein Overfit auf einen Assessor); Claude-Tier-Pfad → ~2 % bekannt | Kriterien-Kalibrierung + LLM-Kaskade | **RC** |
| **Impressum** | FIELD + PRESENTATION (+ Scope-Gate) | **171 falsche Findings → 0** (Scope-Gate); Feldmatrix (Firma/Anschrift/HRB/USt-IdNr/Kontakt) **FP 0 %, Recall 1.0**; 5 Präsentations-Controls an Playwright re-routet | Scope-Gate + deterministischer Feld-Matcher schlägt LLM | **RC** |
| **Cookie** | BEHAVIOR + CONTENT | Artifact-Type-Trennung **Banner ≠ Richtlinie** validiert (Controls liefen am falschen Artefakt → re-routet); Browser-Verhaltens-Matrix (Enforcement, Dark-Pattern, Reject=Accept) | Artifact-Type-Routing + Playwright-Verhaltenssensor | Wave-1 (GT-Stab. offen) |
| **AGB** | CONTENT + REFERENCE + LLM | **71 % FP → ~0** (7-Firmen-Opus-GT): 49 Findings / 35 falsch → bereinigt; Embedding-Rescue **21 Recall-FP gekillt, 0 Fehl-Rescue**; LLM-Judge (ganze §-Abschnitte) **14/14**; Reference-Check **7/7** | **decision_method pro Item** (17 EMBEDDING, 2 LLM, 1 REFERENCE) | Architektur validiert |
## 4. Warum die Zahlen belastbar sind (Methodik-Rigor)
- **Ground Truth mit dem stärksten Modell** (Opus-4-8), nicht mit billigen Modellen.
- **Prove-don't-handwave:** echte FP/FN-Zählungen, Vorher/Nachher, keine Behauptungen.
- **Generalisierung statt Overfit:** Mehr-Firmen-GT (DSE 8, AGB 7) + explizite Leitplanken gegen Ein-Assessor-Overfit.
- **Mehrfach-Referenz-Validierung:** bei AGB 3-Wege (Opus-GT × Claude-Eigenbewertung × Laufzeit-Kaskade) — deckte sogar einen Fehler in der GT selbst auf.
- **Stichprobe vor Aufbau:** vor jeder teuren Klassifikation/Batch zuerst stratifizierte Stichprobe geprüft (verhinderte mehrfach Aufbau auf falschem Fundament).
## 5. Die Schlüssel-Entdeckung (AGB)
Verschiedene Controls **innerhalb desselben Moduls** brauchen verschiedene Richter. Belege:
- Eine **globale** Embedding-Schwelle scheitert bei juristischer Prosa; **per-Item-Schwellen** trennen sauber.
- **Whole-Section-Retrieval** (ganze §-Abschnitte) schlägt Top-k-Chunks für den LLM-Judge deutlich.
- Ein **billig-zuerst-Kaskaden-LLM** taugt nicht als Richter (eskaliert selbstbewusst-falsche Antworten nicht) — für harte Items starken Tier pinnen.
- Ein **Verweis** („siehe Datenschutzerklärung") ist ein REFERENCE/Link-Check, **kein** LLM-Fall.
## 6. Wettbewerbspositionierung
| | Typisches Tool | BreakPilot |
|---|---|---|
| Prüfansatz | ein LLM für alles | Control-Routing → spezialisierter Prüfer |
| False Positives | systematisch (LLM auf Nicht-Text-Pflichten) | je Kontrolltyp minimiert (gemessen) |
| Evidenzbasis | keine | Mehr-Firmen-GT, reproduzierbare Zahlen |
| Skalierung neuer Regulierungen | jedes Mal neu | Mapping auf bestehende Prüfer-Matrix |
## 7. Reifegrad, Ehrlichkeit & Roadmap
- **Validiert (Messung):** alle 4 Module oben.
- **Live im Tool:** DSE-Kriterien (prod). Impressum-Scope/Feldmatrix, Cookie-Artifact-Type und AGB-C-lean sind **validiert, aber noch nicht überall ins Produkt integriert** → Demo-Integration ist der nächste Schritt (Vorher/Nachher live zeigbar machen).
- **Website-/Marketing-Compliance: abgeschlossen** (DSE/Impressum/Cookie/AGB + Architektur). Restliche Web-Doc-Typen (Nutzungsbedingungen, Shop-AGB, Legal Notice, Social-Media) = **Mapping**, keine neue Architektur.
- **Nächste große Etappe (nach Sales):** industrielle Compliance (CRA, Maschinenverordnung, NIS2, DORA, ISO 27001, TISAX, AI Act) — neue Prüfertypen TECHNICAL/PROCESS/EVIDENCE/SYSTEM; die Prüfer-Matrix wird dort wiederverwendet.
@@ -0,0 +1,74 @@
# Plattform-Validierung der Doc-Check-Kalibrierung — `platform_validation_v1`
> **Status:** Plattform-Methodik validiert über 3 strukturell verschiedene Dokumentklassen (2026-06-19).
> **Zweck:** Nicht ein Modul dokumentieren, sondern den **Kalibrierungsprozess** und die **empirische Fehlerkarte** der Engine — damit die *Ursachen* erhalten bleiben (nicht nur die Messwerte). Erkenntnis > Metrik.
## 1. Was hier validiert wurde
Vor dieser Runde war unklar, ob der Restfehler der Doc-Check-Engine aus dem **LLM**, dem **Embedding**, dem **Prompt**, der **Applicability** oder dem **Control-Katalog** stammt — alles vermischt. Nach DSE + Cookie + Impressum existiert eine **belastbare Taxonomie der Fehlerursachen**, und der **Kalibrierungsprozess** hat in drei sehr unterschiedlichen Domänen geliefert. Das ist die eigentliche Errungenschaft — größer als jede einzelne Zahl.
## 2. Der Kalibrierungsprozess (wiederverwendbarer Kern)
1. **Opus-GT** je `(Firma × Control)` über 59 repräsentative Firmen (stärkstes Modell, NICHT Haiku).
2. **Engine-Messung** (Keyword → BGE-M3-Embedding → robuster LLM-Judge) vs GT.
3. **FP-Cluster** — wiederkehrende Controls statt Einzel-Findings (systematisch ≠ zufällig).
4. **Ursachen-Klassifikation** je FP: `SCOPE` / `ARTIFACT_TYPE` / `CRITERIA` / `JUDGE`.
5. **Fix** der dominanten Ursache (versioniert, mit Rechtsnotiz).
6. **Re-Messung** — Pflicht: FP↓ **und** FN stabil. Plus **Anti-Overfit** auf ungesehenen Firmen.
## 3. Plattform-Fehlerkarte (Kernergebnis)
| Modul | Dominante Ursache | Hebel | Ergebnis | Status |
|---|---|---|---|---|
| **DSE** | Kriterien zu streng | Kriterien-Kalibrierung (11 Controls) | FP 11 % → **6 %**, FN ~7 %; **generalisiert** (8 Firmen; fresh FP 7 % / FN 5 %) | Release-Candidate |
| **Cookie** | `artifact_type` (Banner ≠ Richtlinie) | 31 Banner-Controls → `COOKIE_BANNER`; 21 Kriterien (Kategorie statt Pro-Cookie, Zitat optional), Pro-Cookie = Best-Practice | Precision 0,81 → **0,95**, Recall 0,26 → **0,44**, verpasste Lücken → **0 %**, abs. FP 71 → 54 | Wave-1 (dev) |
| **Impressum** | **Scope** (GT-NA 48 %) + **Feld-Extraktion** + **Präsentation** | Scope-Gate (14 raus) + **Feldmatrix-Matcher** (Fakten) + **PRESENTATION_CHECK**-Re-Route (5) | roh: SCOPE-FP 105 / JUDGE-FP 66 → **Text-Check FP 0 % / FN 2 %** | Release-Candidate |
## 4. Meta-Befunde
- **Die generische Architektur bewährt sich.** Jede Domäne hat ein *anderes* dominantes Problem — `artifact_type` / `obligation_type` / `scope` tragen unterschiedlich stark. Eine gute generische Architektur erzeugt nicht überall denselben Effekt, sondern löst je Domäne ein anderes Problem. Genau das ist eingetreten.
- **Die Zielarchitektur ist domänen-adaptiv, nicht uniform.** „Embedding → OVH → Claude" ist nicht überall richtig: bei **Prosa** (DSE/Cookie) ist die LLM-Kaskade der Hebel; bei **strukturierten Faktendokumenten** (Impressum) ist das LLM sogar schwach (es verfehlt Adressen/Felder, die *dastehen*) → dort schlagen **Scope-Gate + deterministischer Feld-Matcher** den LLM-Judge.
- **Wiederkehrendes Anti-Muster:** „vermeintlicher Judge-Fehler → eigentlich Katalog-Fehler" (Scope, Präsentations-statt-Inhalt, Fehl-Typisierung). Erst NACH den Katalog-Fixes ist der Rest ein *echter* Judge-Fehler.
## 4b. Die `verification_method`-Achse (Synthese — die eigentliche Lehre)
Nicht jede Compliance-Pflicht ist ein Textproblem. Die 5 entdeckten Fehlerklassen mappen auf **5 Prüfer-Typen** — eine neue Routing-Metadaten-Achse `verification_method`, die einem Control sagt, *welcher Prüfer* zuständig ist (nicht alles an den LLM):
| `verification_method` | Prüfer | Frage | Beispiel | Status |
|---|---|---|---|---|
| **CONTENT** | Embedding + LLM-Kaskade (OVH→Claude) | Was steht da? | DSE nennt Zwecke; Cookie-Policy | DSE/Cookie kalibriert |
| **FIELD** | Regex/Parser (Feldmatrix) | Welche Felder existieren? | HRB, USt-IdNr, Adresse | Impressum-Fakten ✓ (FP 0 %) |
| **PRESENTATION** | Playwright (Sichtbarkeit/Erreichbarkeit) | Ist es auffindbar/wahrnehmbar? | Impressum leicht erkennbar, ständig verfügbar; Footer nicht verdeckt | Re-Route gemacht; Check offen |
| **BEHAVIOR** | Playwright + API (Interaktion) | Manipuliert es die Entscheidung? | Reject = Accept, Consent VOR Cookie, kein Dark Pattern | Cookie-Banner-Matrix existiert |
| **PROCESS** | Audit/Nachweis | Gibt es internen Nachweis? | VVT, interne Richtlinie, Audit-Entscheidung | Org-Checkliste |
**PRESENTATION ≠ BEHAVIOR** (beide Playwright, andere Rechtslogik): Präsentation = *Auffindbarkeit/Sichtbarkeit/Zugänglichkeit* (Impressum leicht erkennbar); Behavior = *Entscheidungs-Manipulation/Dark-Pattern* (Reject versteckt). Getrennt halten.
**Playwright wird damit vom Crawler zum Compliance-Sensor:** es prüft, was kein LLM kann — `display:none`, `font-size:4px`, Cookie-Layer verdeckt den Footer. LLM sieht `<a href="/impressum">` und sagt „erfüllt"; Playwright sieht die Verdeckung und sagt „nicht erfüllt".
**Kern-Regel der Architektur:** *Was im Text nicht beweisbar ist, gehört nicht in den Text-Check.* → route per `verification_method`. Sobald die Klassen sauber getrennt sind, sinken die FP fast automatisch (Impressum: SCOPE+JUDGE 171 → Text-Check-FP 0).
**Schema-Status:** `canonical_controls.verification_method` existiert (nur ~4 % befüllt, andere/gröbere Taxonomie document/code_review/tool/hybrid), `doc_check_controls` hat sie nicht. Die hier definierte Doc-Check-Routing-Achse ist **aus `control_classification` (artifact_type/obligation_type/check_intent) ableitbar** → kein Schema-Eingriff (eingefroren) nötig; als abgeleitetes Tag in `control_classification` führen.
## 5. Mess-Disziplin (prove-don't-handwave)
- GT mit dem stärksten Modell (`claude-opus-4-8`), nicht Haiku (zu lasch).
- Robust gegen LLM-Leerantworten: Retry + `INSUFFICIENT_EVIDENCE`/Eskalation statt FEHLT (ein realer Produktions-Bug, der die FP künstlich aufblähte).
- **Anti-Overfit:** Kriterien am Gesetz kalibrieren, dann auf *ungesehenen* Firmen gegenprüfen (DSE: 5 Original + 3 frische → stabile Zahlen = kein Overfit).
- OVH ist stochastisch (±~Rauschen je Lauf) und strenger als Opus → der Rest-FP konvergiert über Module auf **OVH-Über-Strenge**.
- **Zirkularitäts-Leitplanke:** Claude = Opus-GT-Modell → ein Claude-Tier-Sim misst die *Kaskaden-Reichweite* (erreicht Opus-Niveau), nicht eine unabhängige Validierung.
## 6. Offen (Reihenfolge)
1. **Claude-Tier-Sim (DSE + Cookie):** quantifiziert den verbleibenden **reinen** Judge-Fehler nach allen Katalog-Fixes — die letzte große unbekannte Variable. Erwartung: kleiner als roh, weil viel „Judge" sich als Katalog entpuppte.
2. **Impressum-Fix:** Rechtsform-Scope-Gate (#33) verdrahten + deterministischer Feld-Matcher + Re-Messung.
3. **Cookie Wave-2** (Cluster-E) + Produktions-Re-Route der 31 Banner-Controls (`control_classification`).
4. **Produktivschaltung** DSE + Cookie (zuletzt; verify-first DB-Write).
## 7. Artefakte
- DSE: `docs-src/development/dse_v1_validation.md`, `dse_criteria_changelog.json`/`dse_criteria_backup.json`.
- Cookie: `cookie_criteria_changelog.json`/`cookie_criteria_backup.json`/`cookie_best_practice.json` (Container `/tmp`), Cluster-Map.
- Impressum: `impressum_fp_by_cause.json` (SCOPE/JUDGE-Split).
- Gedächtnis: `project_engine_quality.md` (Detail je Modul). Werkzeuge: `cc_gt_opus_*`, `cc_engine_*`, `cc_*_candidates*` (alle macmini `/tmp`).
- **Alle Control-Änderungen nur auf macmini-dev**, versioniert, reversibel; Prod-Schaltung ausstehend.
@@ -0,0 +1,59 @@
# `verification_method` — die Prüfer-Routing-Achse
> **Status:** Architektur-Achse (2026-06-19), abgeleitet aus der 3-Modul-Kalibrierung (DSE / Cookie / Impressum).
> **Kernsatz:** *Nicht jede Compliance-Pflicht ist ein Textproblem.* `verification_method` sagt einem Control, **welcher Prüfer** zuständig ist — damit nicht alles am LLM hängt.
## 1. Warum diese Achse existiert
Die Kalibrierung von drei strukturell verschiedenen Dokumentklassen zeigte drei **verschiedene** dominante Fehlerursachen — und alle ließen sich auf die *Wahl des falschen Prüfers* zurückführen:
- **DSE** (Prosa): LLM-Urteil zu streng → Kriterien-Kalibrierung. Prüfer war richtig (LLM), Kriterien falsch.
- **Cookie** (Banner ≠ Richtlinie): Controls am falschen Artefakt geprüft → `artifact_type`-Re-Route.
- **Impressum** (Faktendokument): LLM verfehlt Felder, die *dastehen* (Adresse, HRB) → deterministischer Feld-Matcher schlägt den LLM. Und 5 Controls waren **gar nicht im Text beweisbar** (Erreichbarkeit/Verfügbarkeit) → gehören an Playwright, nicht an den Text-Check.
**Regel:** *Was im Text nicht beweisbar ist, gehört nicht in den Text-Check.* Sobald die Klassen sauber getrennt sind, sinken die False Positives fast automatisch (Beleg Impressum: SCOPE+JUDGE 171 Roh-FP → Text-Check-FP 0).
## 2. Die fünf Klassen
| `verification_method` | Prüfer | Leitfrage | Beispiel | Reifegrad |
|---|---|---|---|---|
| **CONTENT** | Embedding-Recall + LLM-Kaskade (OVH→Claude) | Was steht da? | DSE nennt Verarbeitungszwecke; Cookie-Richtlinie | DSE/Cookie kalibriert |
| **FIELD** | Regex / Parser (Feldmatrix) | Welche Felder existieren + sind valide? | HRB, USt-IdNr, Anschrift, E-Mail+Telefon | Impressum-Fakten ✓ (FP 0 %) |
| **PRESENTATION** | Playwright (Rendering-Sensor) | Ist es auffindbar / wahrnehmbar / erreichbar? | Impressum „leicht erkennbar", ständig verfügbar, Footer nicht verdeckt | Re-Route gemacht, Checker offen |
| **BEHAVIOR** | Playwright + API (Interaktion) | Manipuliert die UI die Entscheidung? | Reject = Accept, Cookies VOR Consent, Dark Pattern | Cookie-Banner-Matrix vorhanden |
| **PROCESS** | Audit / Nachweis | Gibt es einen internen Nachweis? | VVT, interne Richtlinie, Audit-Entscheidung | Org-Checkliste |
## 3. PRESENTATION ≠ BEHAVIOR
Beide nutzen Playwright, prüfen aber **verschiedene Rechtslogik** — getrennt halten:
- **PRESENTATION** = Auffindbarkeit / Sichtbarkeit / Zugänglichkeit. Beispiel: Impressum-Link erreichbar, nicht in 4px-Schrift, nicht hinter `display:none`, nicht dauerhaft vom Cookie-Layer verdeckt.
- **BEHAVIOR** = Entscheidungs-Manipulation / Dark-Pattern. Beispiel: „Ablehnen" versteckt, Vorauswahl gesetzt, Consent technisch ignoriert.
## 4. Playwright als Compliance-Sensor (nicht Crawler)
Playwright prüft, was **kein** LLM kann: Der LLM sieht `<a href="/impressum">` und urteilt „erfüllt"; der Sensor sieht, dass das Element verdeckt / unsichtbar / unerreichbar ist und urteilt „nicht erfüllt". Drei technische Prüfer langfristig:
- **Content-Checker** → LLM (CONTENT)
- **Structure-Checker** → Regex/Parser (FIELD)
- **Presentation-Checker** → Playwright (PRESENTATION + BEHAVIOR)
## 5. Schema-Status & Verortung
- `canonical_controls.verification_method` **existiert**, aber nur ~4 % befüllt und mit *anderer*, gröberer Taxonomie (`document` / `code_review` / `tool` / `hybrid` — generische Enterprise-Verifikation, nicht das Doc-Check-Routing).
- `doc_check_controls` hat **keine** `verification_method`-Spalte.
- → Die hier definierte Doc-Check-Routing-Achse ist **neu**, aber **ableitbar** aus den schon vorhandenen `control_classification`-Achsen (`artifact_type` / `obligation_type` / `check_intent`). **Kein** Schema-Eingriff nötig (DB ist eingefroren) — als abgeleitetes Tag in `control_classification` führen.
Heuristik für die Ableitung (Startpunkt, nicht final):
| Signal | → verification_method |
|---|---|
| `artifact_type = COOKIE_BANNER`, Interaktionspflicht | BEHAVIOR |
| Pflicht zu Erreichbarkeit / Sichtbarkeit / „ständig verfügbar" | PRESENTATION |
| Faktenfeld (Anschrift, Register, Kennung) | FIELD |
| `obligation_type` Prozess / Nachweis ohne Außenwirkung | PROCESS |
| sonst (inhaltliche Offenlegung in Prosa) | CONTENT |
## 6. Warum das über die 3 Module hinaus zählt
Für die nächsten Module (CRA, Maschinenverordnung, NIS2, TISAX, ISO 27001) ist diese Achse vermutlich fast so wichtig wie `artifact_type`: viele dieser Pflichten sind **PROCESS** oder **BEHAVIOR**, kein Textinhalt. Wer sie an den LLM-Text-Check hängt, erzeugt systematische False Positives. Das ist die eigentliche Erkenntnis der Kalibrierung: **nicht** dass DSE/Cookie/Impressum funktionieren, sondern dass klar wurde, *welcher Prüfer für welche Art von Pflicht zuständig ist*.