Compare commits

..

8 Commits

Author SHA1 Message Date
Benjamin Admin 5f8009e844 fix(security): remove hardcoded Qdrant key + allowlist doc false-positives
CI / detect-changes (pull_request) Successful in 8s
CI / branch-name (pull_request) Successful in 1s
CI / guardrail-integrity (pull_request) Successful in 5s
CI / secret-scan (pull_request) Successful in 6s
CI / dep-audit (pull_request) Failing after 54s
CI / sbom-scan (pull_request) Failing after 1m3s
CI / build-sha-integrity (pull_request) Successful in 5s
CI / validate-canonical-controls (pull_request) Successful in 4s
CI / loc-budget (pull_request) Successful in 17s
CI / go-lint (pull_request) Failing after 13s
CI / python-lint (pull_request) Failing after 13s
CI / nodejs-lint (pull_request) Failing after 1m8s
CI / nodejs-build (pull_request) Successful in 3m0s
CI / test-go (pull_request) Successful in 1m0s
CI / iace-gt-coverage (pull_request) Successful in 22s
CI / test-python-backend (pull_request) Successful in 30s
CI / test-python-document-crawler (pull_request) Successful in 13s
CI / test-python-dsms-gateway (pull_request) Successful in 16s
secret-scan (gitleaks) had never run on a PR (broken checkout). A real Qdrant dev API key was hardcoded in 4 pre-existing files; removed in favour of env / gitea-secret references (scripts read QDRANT_API_KEY from os.environ; rag-ingest workflow references a gitea Actions secret). The remaining ~52 findings are doc curl examples + .env.example placeholders + a rule_key identifier, allowlisted in .gitleaks.toml (default ruleset kept). gitleaks now reports 0 findings.

ACTION REQUIRED: rotate the Qdrant dev API key — the leaked value is in git history.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-21 14:37:54 +02:00
Benjamin Admin 079bb56922 fix(ci): clone PR head branch, not the unbuildable merge-ref
CI / detect-changes (pull_request) Successful in 6s
CI / branch-name (pull_request) Successful in 1s
CI / guardrail-integrity (pull_request) Successful in 11s
CI / secret-scan (pull_request) Failing after 6s
CI / dep-audit (pull_request) Failing after 56s
CI / sbom-scan (pull_request) Failing after 1m9s
CI / build-sha-integrity (pull_request) Successful in 14s
CI / validate-canonical-controls (pull_request) Successful in 8s
CI / loc-budget (pull_request) Successful in 18s
CI / go-lint (pull_request) Failing after 26s
CI / python-lint (pull_request) Failing after 13s
CI / nodejs-lint (pull_request) Failing after 1m15s
CI / nodejs-build (pull_request) Successful in 3m9s
CI / test-go (pull_request) Successful in 1m7s
CI / iace-gt-coverage (pull_request) Successful in 22s
CI / test-python-backend (pull_request) Successful in 30s
CI / test-python-document-crawler (pull_request) Successful in 18s
CI / test-python-dsms-gateway (pull_request) Successful in 12s
All 17 checkout blocks cloned via --branch GITHUB_REF_NAME; on pull_request that is a merge ref git clone --branch cannot resolve, so every checkout-based gate (detect-changes, guardrail-integrity, secret-scan, sbom-scan, dep-audit, build-sha-integrity, validate-canonical-controls) failed before running. Now clone GITHUB_HEAD_REF with GITHUB_REF_NAME fallback: PR uses its source branch, push keeps prior behaviour. Additive.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-21 14:16:29 +02:00
Benjamin Admin 24bb449a79 fix(ci): detect-changes.sh always emits outputs (kill the cascade)
CI / detect-changes (pull_request) Failing after 2s
CI / branch-name (pull_request) Successful in 1s
CI / guardrail-integrity (pull_request) Failing after 2s
CI / secret-scan (pull_request) Failing after 2s
CI / dep-audit (pull_request) Failing after 1s
CI / sbom-scan (pull_request) Failing after 2s
CI / build-sha-integrity (pull_request) Failing after 3s
CI / validate-canonical-controls (pull_request) Failing after 1s
CI / loc-budget (pull_request) Has been skipped
CI / go-lint (pull_request) Has been skipped
CI / python-lint (pull_request) Has been skipped
CI / nodejs-lint (pull_request) Has been skipped
CI / iace-gt-coverage (pull_request) Has been skipped
CI / test-python-document-crawler (pull_request) Has been skipped
CI / nodejs-build (pull_request) Has been skipped
CI / test-go (pull_request) Has been skipped
CI / test-python-backend (pull_request) Has been skipped
CI / test-python-dsms-gateway (pull_request) Has been skipped
detect-changes used set -e; an aborting git/grep killed it before writing GITHUB_OUTPUT -> the job outputs mapping evaluated to %!t(string=) and failed detect-changes + every job that needs it. Drop set -e, treat base/diff failure as rebuild-all, add an EXIT trap that emits rebuild-all + exit 0 on any early exit. Verified locally: empty/unreachable BASE_SHA + real diff all emit a full 11-key set.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-21 13:15:54 +02:00
Benjamin Admin 8af9584d09 test(dse): adopt canonical v3 tests + criteria/GT/validation
CI / detect-changes (pull_request) Failing after 5s
CI / branch-name (pull_request) Successful in 2s
CI / guardrail-integrity (pull_request) Failing after 4s
CI / secret-scan (pull_request) Failing after 4s
CI / dep-audit (pull_request) Failing after 2s
CI / sbom-scan (pull_request) Failing after 2s
CI / build-sha-integrity (pull_request) Failing after 3s
CI / validate-canonical-controls (pull_request) Failing after 3s
CI / loc-budget (pull_request) Has been skipped
CI / go-lint (pull_request) Has been skipped
CI / python-lint (pull_request) Has been skipped
CI / nodejs-lint (pull_request) Has been skipped
CI / nodejs-build (pull_request) Has been skipped
CI / test-go (pull_request) Has been skipped
CI / iace-gt-coverage (pull_request) Has been skipped
CI / test-python-backend (pull_request) Has been skipped
CI / test-python-document-crawler (pull_request) Has been skipped
CI / test-python-dsms-gateway (pull_request) Has been skipped
Replace the reconstructed test_dse_agent.py with the canonical version and add
the companion unit tests (classification_gate, embedding_recall) covering the
recovered v3 modules. Include the curated DSE criteria backup + changelog
(legal-note rationale per control), the v1 validation writeup, and the
multi-company DSE ground-truth fulltexts (elli/eto/mercedes/safetykon) used
for threshold calibration.

18 DSE tests green offline (DB/embedding/LLM stubbed).

dev-only, no deploy.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-21 12:35:16 +02:00
Benjamin Admin ce6b4c58e3 feat(agent-ui): add Architektur tab explaining the doc-check pipeline
Mirror the CE module's /sdk/iace/.../architektur tab for /sdk/agent: a
hand-authored schema (data-flow lanes, step-by-step pipeline accordion,
module-engine cards, Pruefer-Matrix) explaining orchestrator phases A-F,
the parallel specialist agents (Impressum/AGB/DSE), the 4-layer DSE engine,
and the verification/decision-method meta-model. Adds a page-level
Check | Architektur tab toggle (the page was flat).

Static content (the Python doc-check has no architecture endpoint, unlike
the Go IACE module); can be data-fed later.

NOTE: not yet lint/type/browser-verified -- the worktree has no node_modules.
Needs a visual check + next lint / tsc in an env with the toolchain.

dev-only, no deploy.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-21 11:25:28 +02:00
Benjamin Admin f6d018234b feat(dse): recover v3 DSE engine from container + wire into live check path
The calibrated DSE engine (4-layer: regex-boost / keyword / BGE-M3 embedding
recall @0.65 / semantic-validator) existed ONLY in the running macmini
container (docker cp'd, never committed) — at risk of loss on any container
rebuild. This recovers it into git and wires it into the live check path.

- Recover dse/{agent,v3_engine,_embedding_recall,_classification_gate,
  regex_boost,mcs,deep_check}.py. DSEAgent (v3, BaseSpecialistAgent) replaces
  the keyword-only stub: delegates MC-loading to the main engine
  (rag_document_checker._load_controls), deterministic cached embedding recall
  (reachability-gated), semantic-validator LLM layer honoring skip_llm,
  third-country -> HIGH on documented transfer.
- Wire "dse" into _agent_outputs._TOPIC_AGENTS -> live check emits a validated
  DSE tab (was snapshot/legacy-only).
- Tests rewritten for v3 (DB/embedding/LLM stubbed offline): regex-boost
  detection, embedding-recall reachability guard, result->Finding conversion,
  third-country HIGH; topic-wiring asserts "dse".
- deep_check.py recovered for preservation (alternate LLM-judge path, unwired).

Runtime data deps for full live behavior (note for prod): doc_check_controls
in DB + /data/mc_classification.db embedding sidecar + embedding-service; all
degrade gracefully (keyword layer carries) if absent.

dev-only, no deploy.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-21 11:15:52 +02:00
Benjamin Admin 32e45f0797 feat(agb): wire validated routed AGB engine into live check path
Consolidate the AGB C-lean engine (71% FP -> ~0, validated vs 7-company
Opus GT) onto the canonical checker library and into the live check path.

- AGBAgent.evaluate now runs routed C-lean: keyword (L1/L2) -> business-
  model gate -> per-item decision_method routing (embedding/reference/llm
  via services/checkers/) -> severity re-tiering (LOW -> recommendation),
  honoring context.skip_llm.
- New agb/_pipeline.py orchestrates the routing; agent.py stays thin.
- Remove the 3 AGB-local checker duplicates (_reference_check,
  _embedding_rescue, _llm_judge); services/checkers/ is now canonical.
- Wire "agb" into _agent_outputs._TOPIC_AGENTS so the live check emits a
  validated AGB tab (was snapshot-only).
- Run topic agents concurrently (asyncio.gather) + emit each tab via SSE
  as it finishes -> progressive results, no wait on the slowest agent.
- Tests: checker units (mocked), routed agent (gate/rescue/re-tier),
  topic wiring; existing AGB tests made offline-safe.

dev-only, no deploy.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-21 10:40:08 +02:00
Benjamin Admin 9d79cf1576 docs+feat(platform): Pruefer-Matrix-Foundation einfrieren (Evidenz, Mapping, Checker-Library, AGB-Kalibrierung)
Know-how-Freeze der Website-Compliance-Runde (DSE/Cookie/Impressum/AGB). docs: platform_evidence_v1 (Evidenz-/Qualitaetsnachweis, echte Zahlen), nutzungsbedingungen_mapping (neues Modul = Mapping, empirisch belegt), platform_checker_matrix (Meta-Modell verification_method x decision_method), verification_method, platform_validation_v1. code: checkers/ (reusable Pruefer-Library base+reference+embedding+llm, im Container validiert), agb/ (decision_method-Routing + Checker-Prototypen, 71% FP -> ~0 validiert). Dev-only, kein Prod-Push; Benchmark-GTs/Korpora im internen Archiv (data-retention).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-21 09:23:21 +02:00
23 changed files with 14 additions and 1239 deletions
+2 -3
View File
@@ -130,11 +130,10 @@ rsync -avz --exclude node_modules --exclude .next --exclude .git \
**breakpilot-core MUSS laufen!** Dieses Projekt nutzt Core-Services:
- Valkey (Session-Cache)
- Vault (Secrets)
- RAG-Service (Vektorsuche fuer Compliance-Dokumente)
- Nginx (Reverse Proxy)
Secrets liegen in Infisical (`secrets.meghsakha.com`); die Projektverknuepfung steht in `.infisical.json`. Lokal mit `infisical run --env=dev -- docker compose up` (oder `make dev`) starten — `.env`/`.env.local` werden nicht mehr verwendet.
**Externe Services (Production):**
- PostgreSQL 17 (sslmode=require) — Schemas: `compliance`, `public`
- Qdrant @ `qdrant-dev.breakpilot.ai` (HTTPS, API-Key)
@@ -317,7 +316,7 @@ ssh macmini "/usr/local/bin/docker compose -f /Users/benjaminadmin/Projekte/brea
### 5. Sensitive Dateien
**NIEMALS aendern oder committen:**
- `.env`, `.env.local`, Infisical-Tokens, SSL-Zertifikate
- `.env`, `.env.local`, Vault-Tokens, SSL-Zertifikate
- `*.pdf`, `*.docx`, kompilierte Binaries, grosse Medien
---
+1 -1
View File
@@ -92,7 +92,7 @@ Wenn Hochrisiko:
- [ ] **Transit:** TLS 1.3 für alle Verbindungen
- [ ] **Rest:** Datenbank-Verschlüsselung
- [ ] **Secrets:** Infisical (`secrets.meghsakha.com`) für Credentials
- [ ] **Secrets:** Vault für Credentials
### Zugriffskontrollen
-5
View File
@@ -1,5 +0,0 @@
{
"workspaceId": "996bda36-9e01-4071-ae8d-69a9f9ff5a23",
"defaultEnvironment": "",
"gitBranchToEnvironmentMapping": null
}
-157
View File
@@ -1,157 +0,0 @@
# Infisical Setup for Local Development
This is the per-developer onboarding for accessing the `breakpilot-compliance` secrets while developing locally. Once this is done, **everything you launch through `make dev` (or `infisical run …`) gets the dev secrets injected as environment variables** — including any Claude Code session that spawns those commands.
Secrets live in the self-hosted Infisical instance at **`secrets.meghsakha.com`**. The project link is committed in `.infisical.json`, so you don't need to know the project ID.
---
## 1. Install the Infisical CLI
**macOS (recommended):**
```bash
brew install infisical/get-cli/infisical
```
**Other platforms / manual install:**
See <https://infisical.com/docs/cli/overview>. Verify with:
```bash
infisical --version
# infisical version 0.43.x (or newer)
```
---
## 2. Log in to the self-hosted instance
```bash
infisical login --domain https://secrets.meghsakha.com
```
This opens a browser for SSO. The login is persisted to your OS keychain — you only do this once per machine.
Sanity check:
```bash
cd ~/projects/breakpilot-compliance # wherever you cloned the repo
infisical --domain https://secrets.meghsakha.com secrets --env=dev
```
You should see a table of secret names + values. If you get an auth error, re-run `infisical login`.
---
## 3. Verify the project link
The repo already contains `.infisical.json` pointing at the `breakpilot-compliance` project:
```bash
cat .infisical.json
# { "workspaceId": "996bda36-9e01-4071-ae8d-69a9f9ff5a23", ... }
```
If the file is missing (rare — only if you reset the repo), recreate it:
```bash
infisical init --domain https://secrets.meghsakha.com
```
Pick the `breakpilot-compliance` project from the picker.
---
## 4. Launch the stack
```bash
make dev
```
This runs `infisical run --env=dev -- docker compose up`. Every service in the compose stack sees its secrets as normal env vars — no `.env` file ever touches disk.
Other targets:
| Target | What it does |
|--------|--------------|
| `make dev-build` | Same as `make dev` but rebuilds images first |
| `make dev-down` | Stop the stack (no secrets needed) |
| `make dev-logs` | Tail logs |
| `make dev-ps` | List running containers |
| `make secrets` | Print all secrets in `dev` (read-only) |
| `make secrets-set KEY=FOO VALUE=bar` | Add or update a secret in `dev` |
To target a different environment:
```bash
make dev ENV=staging
make secrets ENV=prod
```
---
## 5. Using secrets from Claude Code
When Claude Code runs commands in this repo via its Bash tool, the commands inherit your shell's environment. Two patterns:
**Pattern A — let Claude launch the stack normally**
Claude just runs `make dev`. The Infisical CLI inside that command resolves secrets at run time and passes them to docker compose. Claude doesn't see plaintext secrets in its context, but the running services do.
**Pattern B — let Claude run a one-off script with secrets**
If Claude needs to execute a Python/Go script that requires secrets, wrap the command:
```bash
infisical run --env=dev -- python scripts/some_one_off.py
```
This works for any subprocess: pytest, alembic, go run, npm scripts. If Claude proposes a command that reads env vars and runs raw, ask it to wrap it in `infisical run --env=dev --` first.
**What Claude should not do:**
- `infisical export --env=dev > .env` — defeats the whole point and the `.gitignore` will still try to keep the file out.
- `infisical secrets get KEY --env=dev --raw` and pasting the value into a code edit — secrets must stay out of the repo.
If you want Claude to never accidentally dump secrets, add this to your `.claude/settings.json` permissions (project-level or user-level):
```json
{
"permissions": {
"deny": [
"Bash(infisical export*)",
"Bash(infisical secrets get*)"
]
}
}
```
---
## Troubleshooting
| Symptom | Fix |
|---------|-----|
| `please either run infisical init or pass --projectId` | `.infisical.json` is missing or unreadable — re-run `infisical init` |
| `unauthorized` / `please log in` | Re-run `infisical login --domain https://secrets.meghsakha.com` |
| `make dev` says secret is empty | Check the name in `make secrets` matches what docker-compose expects, then update the service config or rename the secret in Infisical |
| Browser SSO doesn't open | Use `infisical login --domain https://secrets.meghsakha.com --method=user` and paste the URL manually |
---
## What the dev env contains
Run `make secrets` to see the live list. As of this writing the dev env includes (at minimum):
- `BREAKPILOT_DB_PASSWORD`
- `BREAKPILOT_QDRANT_API_KEY`
- `LITELLM_API_KEY`
Every other variable in `.env.example` either has a sane default in `docker-compose.yml` or needs to be added to Infisical. To add one:
```bash
make secrets-set KEY=ANTHROPIC_API_KEY VALUE=sk-ant-xxxx
```
Or via the web UI: <https://secrets.meghsakha.com>.
-57
View File
@@ -1,57 +0,0 @@
# breakpilot-compliance — developer workflow
#
# Secrets are managed in Infisical (secrets.meghsakha.com). The project
# link lives in .infisical.json. To get started:
# 1) infisical login --domain https://secrets.meghsakha.com (once per machine)
# 2) make dev
#
# .env / .env.local are NOT used in this repo anymore. Anything that needs
# secrets MUST be launched through `infisical run` so the values come from
# the secrets store instead of disk.
INFISICAL ?= infisical
INFISICAL_DOMAIN ?= https://secrets.meghsakha.com
ENV ?= dev
INFISICAL_RUN := $(INFISICAL) --domain $(INFISICAL_DOMAIN) run --env=$(ENV) --
INFISICAL_SECRETS := $(INFISICAL) --domain $(INFISICAL_DOMAIN) secrets --env=$(ENV)
.PHONY: help dev dev-build dev-down dev-logs dev-ps secrets secrets-set check-loc
help:
@echo "Targets:"
@echo " dev Start the full compose stack with secrets injected from Infisical"
@echo " dev-build Same as dev, but force a rebuild first"
@echo " dev-down Stop the compose stack (no secrets needed)"
@echo " dev-logs Tail logs from all services"
@echo " dev-ps Show running containers"
@echo " secrets List all secrets in the current env ($(ENV))"
@echo " secrets-set Set a secret (KEY=... VALUE=...)"
@echo " check-loc Run the 500-line LOC guard"
dev:
$(INFISICAL_RUN) docker compose up
dev-build:
$(INFISICAL_RUN) docker compose up --build
dev-down:
docker compose down
dev-logs:
docker compose logs -f
dev-ps:
docker compose ps
secrets:
$(INFISICAL_SECRETS)
secrets-set:
@if [ -z "$(KEY)" ] || [ -z "$(VALUE)" ]; then \
echo "Usage: make secrets-set KEY=MY_KEY VALUE=my_value"; exit 1; \
fi
$(INFISICAL) --domain $(INFISICAL_DOMAIN) secrets set $(KEY)=$(VALUE) --env=$(ENV)
check-loc:
bash scripts/check-loc.sh
+6 -9
View File
@@ -42,26 +42,23 @@ All containers share the external `breakpilot-network` Docker network and depend
## Quick Start
**Prerequisites:** Docker, Go 1.24+, Python 3.12+, Node.js 20+, [Infisical CLI](https://infisical.com/docs/cli/overview)
**Prerequisites:** Docker, Go 1.24+, Python 3.12+, Node.js 20+
```bash
git clone ssh://git@gitea.meghsakha.com:22222/Benjamin_Boenisch/breakpilot-compliance.git
cd breakpilot-compliance
# One-time per machine: log in to the self-hosted Infisical instance
infisical login --domain https://secrets.meghsakha.com
# Copy and populate secrets (never commit .env)
cp .env.example .env
# Start the full stack with secrets injected from Infisical (env=dev)
make dev
# Start all services
docker compose up -d
```
Secrets are pulled from Infisical (`secrets.meghsakha.com`) at runtime; `.env` files are not used. See [INFISICAL_SETUP.md](./INFISICAL_SETUP.md) for full onboarding, and `make help` for the rest of the targets (`dev-build`, `dev-down`, `secrets`, `secrets-set`).
For the Orca/Hetzner production target (x86_64), use the override:
```bash
make dev ENV=prod # or:
infisical run --env=prod -- docker compose -f docker-compose.yml -f docker-compose.hetzner.yml up -d
docker compose -f docker-compose.yml -f docker-compose.hetzner.yml up -d
```
---
@@ -35,25 +35,6 @@ Dies ist ein **Legal RAG**. Eine falsch zitierte Fundstelle ist schlimmer als ga
- **Interne IDs** (Control-IDs wie SEC-xxxx, MC-/M-Nummern) gehoeren NICHT in die Nutzerantwort
als Hauptaussage — fuehre die Pflicht im Klartext, eine ID hoechstens in Klammern nachgestellt.
## Korpus-Autoritaet & Aktualitaet — der Kontext schlaegt dein Gedaechtnis (KRITISCH)
Gesetze aendern sich nach deinem Trainingsstand. Der bereitgestellte RAG-/Controls-Kontext bildet
den AKTUELLEN Rechtsstand ab — dein Trainingswissen kann veraltet sein. Diese Regel gilt fuer
FAKTEN, nicht nur fuer Fundstellen (ergaenzt **Quellentreue**).
- Rechtliche **Fakten** (Schwellenwerte, Fristen, Zahlen, ob/ab-wann eine Pflicht gilt,
Zustaendigkeiten) nimmst du AUSSCHLIESSLICH aus dem bereitgestellten Kontext. Dein Trainingswissen
dient nur fuer Sprache, Struktur und Schlussfolgerung — **niemals als Rechtsquelle**.
- Steht ein gefragter Fakt NICHT im Kontext: gib KEINE aus dem Gedaechtnis erinnerte Zahl/Frist/
Schwelle aus — auch nicht beilaeufig im Fliesstext ohne Fundstelle. Sag offen, dass du ihn aus
deinen geprueften Quellen nicht belegen kannst, nenne Pflicht/Thema allgemein, und biete den
naechsten Schritt an (gezielt nachschlagen / mit DSB oder Anwalt verifizieren).
- **Konflikt-Transparenz**: Weicht der Kontext von dem ab, was dir "gelaeufig" vorkommt, gewinnt
IMMER der Kontext. Mach es ruhig transparent — z.B. "Die aktuelle Quelle nennt 20; eine evtl.
aeltere, gelaeufige Annahme (10) gilt hier nicht."
- **Co-Pilot-Ton, keine Roboter-Verweigerung**: formuliere "Aus meinen geprueften Quellen kann ich
X nicht belegen — ich kann es gezielt nachschlagen, oder du klaerst es mit deinem DSB/Anwalt"
statt eines harten "Nein". Du bleibst hilfreicher Begleiter, gibst dem Nutzer aber keine
ungesicherte Rechtsangabe als Tatsache mit.
## Kompetenzbereich
- DSGVO Art. 1-99 + Erwaegsgruende
- BDSG (Bundesdatenschutzgesetz)
@@ -80,7 +80,7 @@ export async function POST(request: NextRequest) {
let systemContent = soulPrompt || FALLBACK_SYSTEM_PROMPT
if (validCountry) systemContent += countryBlock(validCountry)
if (ragContext) {
systemContent += `\n\n## Relevanter Kontext aus dem RAG-System (deine EINZIGEN Rechtsquellen)\n\nDies sind deine einzigen zulaessigen Rechtsquellen. Triff keine konkrete Rechtsaussage (Zahl, Frist, Schwelle, Pflicht, Fundstelle), die nicht hier oder im Controls-Block belegt ist — sonst sage offen, dass du sie aus deinen Quellen nicht belegen kannst. Verweise in deiner Antwort auf die jeweilige Quelle:\n\n${ragContext}`
systemContent += `\n\n## Relevanter Kontext aus dem RAG-System\n\nNutze die folgenden Quellen fuer deine Antwort. Verweise in deiner Antwort auf die jeweilige Quelle:\n\n${ragContext}`
}
if (controlsContext) systemContent += `\n\n${controlsContext}`
systemContent += `\n\n## Aktueller SDK-Schritt\nDer Nutzer befindet sich im SDK-Schritt: ${currentStep}`
@@ -45,11 +45,6 @@ class LLMChecker:
text = doc.text or ""
if len(text) < 50:
return CheckResult(present=None, source="llm")
# decision_method=LLM mit judge='haiku': Sufficiency-Pfad (validiert
# P0.89/R0.91). Der Qwen-first-Cascade ist als Sufficiency-Judge
# widerlegt -> hier Haiku direkt, kriteriengeführte Subsumtion.
if (ctrl.extra or {}).get("judge") == "haiku":
return await self._haiku(ctrl, text)
secs = _sections(text)
if ctrl.topic_regex:
rel = [s for s in secs if re.search(ctrl.topic_regex, s, re.I)][:6] or secs[:6]
@@ -76,31 +71,3 @@ class LLMChecker:
except Exception as e:
logger.info("llm checker fail %s: %s", ctrl.control_id, str(e)[:80])
return CheckResult(present=None, source="error")
async def _haiku(self, ctrl: ControlSpec, text: str) -> CheckResult:
"""Sufficiency via Haiku direkt (validierter Judge). Kriteriengeführt:
die Rechts-Elemente stehen in ctrl.paraphrases; wiederverwendet den
validierten deep_check-Sufficiency-Prompt."""
try:
from compliance.services.llm_cascade import _call_anthropic
from compliance.services.specialist_agents.dse.deep_check import (
_JUDGE_SYS, _build_user, _parse as _parse_judge,
)
crit = ctrl.paraphrases or [ctrl.label or ctrl.control_id]
user = _build_user(text, ctrl.label or ctrl.control_id, crit)
obj = None
for _ in range(2):
obj = _parse_judge(await _call_anthropic(_JUDGE_SYS, user, max_tokens=400))
if obj:
break
if not obj:
return CheckResult(present=None, source="haiku")
return CheckResult(
present=bool(obj.get("erfuellt")),
evidence=(obj.get("begruendung") or "")[:120],
confidence=float(obj.get("confidence") or 0.0),
source="haiku",
)
except Exception as e:
logger.info("llm haiku checker fail %s: %s", ctrl.control_id, str(e)[:80])
return CheckResult(present=None, source="error")
@@ -1,68 +0,0 @@
"""Prüfer-Router — method-agnostischer Dispatch.
control → sensor_classification (verification_method + decision_method) → Checker.
Ein neues Modul liefert nur ControlSpecs; der Router wählt den Prüfer. Damit wird
der „Embedding findet, Claude entscheidet"-Pfad EIN gemeinsamer CONTENT/LLM-Prüfer
statt Cookie-Sonderlogik. Nicht-gebaute Prüfer (PLAYWRIGHT/AUDIT/SCANNER/REGEX-
FIELD) → present=None (fail-safe: Aufrufer behält sein deterministisches Ergebnis).
"""
from __future__ import annotations
from typing import Any, Optional
from .base import CheckResult, ControlSpec, DecisionMethod, DocContext
from .embedding_checker import EmbeddingChecker
from .llm_checker import LLMChecker
from .reference_checker import ReferenceChecker
_LLM = LLMChecker()
_EMB = EmbeddingChecker()
_REF = ReferenceChecker()
# decision_method → Checker. Fehlende Mechanismen bewusst None (noch nicht gebaut).
_BY_DECISION: dict[str, Any] = {
DecisionMethod.LLM: _LLM,
DecisionMethod.EMBEDDING: _EMB,
DecisionMethod.LINK_RESOLVER: _REF,
}
async def route_and_check(ctrl: ControlSpec, doc: DocContext) -> CheckResult:
checker = _BY_DECISION.get((ctrl.decision_method or "").upper())
if checker is None:
return CheckResult(present=None,
source=f"no_checker:{ctrl.decision_method}")
return await checker.check(ctrl, doc)
def build_spec(
control_id: str,
sensor_classification: Optional[dict[str, Any]],
*,
label: str = "",
criteria: Optional[list] = None,
question: str = "",
patterns: Optional[list[str]] = None,
embed_threshold: Optional[float] = None,
) -> ControlSpec:
"""Baut ein ControlSpec aus der GESPEICHERTEN sensor_classification
(canonical_controls.generation_metadata.sensor_classification) + den
Control-Kriterien. CONTENT/LLM → judge='haiku' (validierter Sufficiency-
Judge; Default für Sufficiency lt. Entscheidung 2026-06-22)."""
sc = sensor_classification or {}
vm = (sc.get("verification_method") or "").upper()
dm = (sc.get("decision_method") or "").upper()
extra: dict[str, Any] = {}
if vm == "CONTENT" and dm == "LLM":
extra["judge"] = "haiku"
return ControlSpec(
control_id=control_id,
verification_method=vm,
decision_method=dm,
label=label,
paraphrases=[str(c) for c in (criteria or []) if c],
question=question,
patterns=patterns or [],
embed_threshold=embed_threshold,
extra=extra,
)
@@ -142,26 +142,19 @@ async def _call_ovh(system: str, user: str, max_tokens: int = 6000) -> str:
headers = {"Content-Type": "application/json"}
if key:
headers["Authorization"] = f"Bearer {key}"
# gpt-oss-120b is a REASONING model: it spends output tokens on
# chain-of-thought before emitting the answer. A low cap (e.g. deep_check's
# max_tokens=400) makes it hit the length limit mid-reasoning and return
# content=null — the whole tier then silently yields nothing. Floor the
# budget so the reasoning AND the JSON answer fit.
payload = {
"model": model, "temperature": 0.05, "max_tokens": max(max_tokens, 2000),
"model": model, "temperature": 0.05, "max_tokens": max_tokens,
"messages": [{"role": "system", "content": system},
{"role": "user", "content": user}],
"response_format": {"type": "json_object"},
}
try:
async with httpx.AsyncClient(timeout=90.0) as c:
async with httpx.AsyncClient(timeout=45.0) as c:
r = await c.post(f"{base.rstrip('/')}/v1/chat/completions",
json=payload, headers=headers)
r.raise_for_status()
msg = (r.json().get("choices") or [{}])[0].get("message") or {}
# Answer is normally in content; if the model was length-capped the
# JSON can land in reasoning_content instead — fall back to it.
return (msg.get("content") or "") or (msg.get("reasoning_content") or "")
choice = (r.json().get("choices") or [{}])[0]
return (choice.get("message") or {}).get("content", "") or ""
except Exception as e:
logger.warning("ovh cascade tier 2 failed: %s", e)
return ""
@@ -1,78 +0,0 @@
"""Applicability-Gate fuer den Cookie-Policy-Scan.
Schliesst Controls aus dem Cookie-Findings-Scan aus, die laut
`compliance.control_classification` NICHT gegen eine Cookie-Policy laufen
('COOKIE_POLICY' nicht in applicable_artifacts). Diese gehoeren zu einem
anderen Artefakt/Pruefer — Banner (BEHAVIOR/Playwright), Security/TOM/Audit
(PROCESS) — und erzeugen sonst Unsinn-Findings (z.B. 'TOMs nicht dokumentiert'
gegen eine Cookie-Richtlinie). Sie werden NICHT geloescht, sondern als
Routing-Liste zurueckgegeben.
Anders als das DSE-Gate OHNE needs_review-Ausnahme: das Artefakt-Signal ist
hier entscheidend und per Inventar (2026-06-21) belegt; die mis-scopeten 11
sind geprueft. Fail-safe: fehlt die Tabelle / DB nicht erreichbar -> leeres
Dict -> es wird NICHT gefiltert (kein stiller Recall-Verlust).
"""
from __future__ import annotations
import logging
import os
from typing import Any
logger = logging.getLogger(__name__)
async def load_cookie_gate(db_url: str = "") -> dict[str, dict[str, Any]]:
"""Liefert {control_id: meta} fuer Controls, die aus dem Cookie-Findings-
Scan auszuschliessen sind (kein COOKIE_POLICY-Artefakt). Leeres Dict =
kein Filter."""
dsn = (db_url or os.getenv("DATABASE_URL")
or os.getenv("COMPLIANCE_DATABASE_URL") or "")
if not dsn:
return {}
try:
import asyncpg
conn = await asyncpg.connect(dsn)
try:
rows = await conn.fetch(
"""SELECT control_id, obligation_type, check_intent,
applicable_artifacts
FROM compliance.control_classification
WHERE is_active
AND NOT ('COOKIE_POLICY' = ANY(applicable_artifacts))""")
finally:
await conn.close()
except Exception as e: # Tabelle fehlt / DB weg -> kein Filter
logger.info("cookie classification gate inaktiv: %s", str(e)[:90])
return {}
return {
r["control_id"]: {
"obligation_type": r["obligation_type"],
"check_intent": r["check_intent"],
"applicable_artifacts": list(r["applicable_artifacts"] or []),
}
for r in rows if r["control_id"]
}
def apply_gate(
controls: list[dict[str, Any]],
gate: dict[str, dict[str, Any]],
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
"""Teilt geladene Controls in (kept, routed_out).
kept: laufen normal durch den Cookie-Scan.
routed_out: aus dem Scan genommen (control_id + title + Klassifikations-
Metadaten fuer das Routing zu Banner/Security/Audit).
"""
kept: list[dict[str, Any]] = []
routed_out: list[dict[str, Any]] = []
for c in controls:
cid = c.get("control_id")
meta = gate.get(cid) if cid else None
if meta:
routed_out.append({"control_id": cid, "title": c.get("title"), **meta})
else:
kept.append(c)
return kept, routed_out
@@ -1,63 +0,0 @@
"""Layer-3 Sufficiency-Judge fuer Cookie-Policy.
Das Embedding/Boost-Auto-Rescue (Layer 0/2) ist BEWUSST optimistisch — es findet
das Thema, beweist aber nicht die Erfuellung. Messung (2026-06-22): 159 FN
(Over-Rescue) gegen Opus-GT, weil 'Thema erwaehnt' als 'erfuellt' durchgewunken
wurde. Diese Schicht prueft GENAU die rescued Controls mit dem validierten
Haiku-Judge (Cohort cookie_sufficiency_v1: P0.89/R0.91) — NICHT die Qwen-first-
Kaskade (lokal ist als Sufficiency-Judge widerlegt) — und nimmt 'passed' zurueck,
wenn die konkrete Pflicht nicht erfuellt ist. 'Embedding findet, Claude entscheidet.'
Nur fuer den NICHT-skip_llm-Pfad (voller Check); der schnelle/interaktive Pfad
behaelt das deterministische Rescue.
"""
from __future__ import annotations
import logging
from typing import Any
logger = logging.getLogger(__name__)
_RESCUE_MARKERS = ("+embedding", "+regex_boost")
def _is_rescued(r: dict[str, Any]) -> bool:
src = r.get("source") or ""
return r.get("passed") and any(m in src for m in _RESCUE_MARKERS)
async def judge_rescued(text: str, results: list[dict[str, Any]]) -> int:
"""Prueft alle rescued (embedding/boost) passed-Controls mit Haiku.
Nimmt passed zurueck, wenn der Judge die Pflicht als NICHT erfuellt sieht.
Gibt die Anzahl zurueckgenommener (korrigierter) Rescues zurueck.
"""
# Über den gemeinsamen Prüfer-Router (kein Cookie-Sonderfall mehr):
# CONTENT/LLM → build_spec setzt judge='haiku' → LLMChecker (validierter
# Sufficiency-Judge). Damit ist Cookie der erste echte Router-Consumer.
from compliance.services.checkers.base import DocContext
from compliance.services.checkers.router import build_spec, route_and_check
candidates = [r for r in results if _is_rescued(r)]
if not candidates:
return 0
doc = DocContext(text=text)
sc = {"verification_method": "CONTENT", "decision_method": "LLM"}
corrected = 0
for r in candidates:
crit = r.get("_pass_criteria") or [r.get("label") or r.get("hint") or ""]
if not isinstance(crit, list):
crit = [str(crit)]
label = r.get("label") or r.get("hint") or r.get("control_id") or ""
spec = build_spec(r.get("control_id") or "", sc, label=label, criteria=crit)
res = await route_and_check(spec, doc)
if res.present is False:
r["passed"] = False
r["source"] = (r.get("source") or "") + "+llm_failed"
r["matched_text"] = "[layer-3 sufficiency-judge: nicht erfuellt]"
r["_judge_reason"] = (res.evidence or "")[:200]
corrected += 1
if corrected:
logger.info("cookie layer-3 sufficiency-judge: %d/%d rescues zurueckgenommen",
corrected, len(candidates))
return corrected
@@ -96,22 +96,6 @@ class CookiePolicyAgent(BaseSpecialistAgent):
"Branchen-MCs entfernt"
)
# Layer 3 — Sufficiency-Judge (Haiku) auf die embedding/boost-rescued
# Controls: Embedding findet das Thema, Claude entscheidet ob die Pflicht
# konkret erfuellt ist. Nur im vollen Check (nicht skip_llm).
skip_llm = bool((agent_input.context or {}).get("skip_llm"))
if not skip_llm:
try:
from ._sufficiency_judge import judge_rescued
corrected = await judge_rescued(text, results)
if corrected:
notes_parts.append(
f"layer-3 sufficiency-judge: {corrected} Rescues "
"zurückgenommen"
)
except Exception as e:
logger.warning("cookie layer-3 judge skipped: %s", e)
seen: set[str] = set()
for r in results:
mc_id = r.get("control_id") or ""
@@ -45,15 +45,6 @@ async def run_v3_pipeline(
controls = []
_normalize_criteria(controls)
controls, sector_dropped = _filter_sector(controls, business_scope)
# Artefakt-Gate: Controls ohne COOKIE_POLICY-Artefakt (Security/TOM/Audit,
# Banner) raus — sie gehoeren zu anderem Pruefer/Artefakt und erzeugen sonst
# Unsinn-Findings. Siehe _classification_gate.
routed_out: list[dict[str, Any]] = []
try:
from ._classification_gate import apply_gate, load_cookie_gate
controls, routed_out = apply_gate(controls, await load_cookie_gate(db_url))
except Exception as e:
logger.warning("cookie classification gate skipped: %s", e)
results: list[dict[str, Any]] = []
if controls:
try:
@@ -120,7 +111,6 @@ async def run_v3_pipeline(
"layer_0_boost_overrides": boost_overrides,
"total_mcs": len(results),
"sector_dropped": sector_dropped,
"artifact_gated": len(routed_out),
}
return results, telemetry
@@ -1,183 +0,0 @@
"""Getierte 3-Status-Auswertung für DSE-Controls mit `tiered_criteria`.
Pro Kriterium wird nach `decision_method` bewertet:
- EMBEDDING (Präsenz): deterministisch (festes Modell), Doc EINMAL pro Scan
eingebettet → reproduzierbar, kein LLM. Trägt den GROSSTEIL.
- LLM (Sufficiency): Haiku-Judge, GECACHT pro (doc_hash, control_id#idx,
PROMPT_VERSION, criterion) → gleicher Scan = gleiches Ergebnis. Löst die
empirisch gemessene Judge-Varianz (ein Live-Call ist NICHT reproduzierbar).
Status NUR aus LEGAL_MINIMUM:
ERFÜLLT (alle LM erfüllt ODER kein LM) · FEHLT (kein LM erfüllt) ·
TEILWEISE (Teil der LM erfüllt) · UNBESTIMMT (LM nicht bewertbar, z. B.
Embedding-Service down → Aufrufer behält sein Legacy-Ergebnis).
BEST_PRACTICE/OPTIONAL fließen NIE in den Status, nur in `recommendations`.
Siehe docs-src/development/criterion_meta_model.md.
"""
from __future__ import annotations
import asyncio
import hashlib
import logging
import os
import sqlite3
from typing import Any, Optional
logger = logging.getLogger(__name__)
PROMPT_VERSION = "dse-tier-v1"
_CACHE_DB = os.getenv("TIERED_JUDGE_CACHE", "/data/tiered_judge_cache.db")
_EMBED_THR = float(os.getenv("DSE_CRITERION_EMBED_THRESHOLD", "0.62"))
LM = "LEGAL_MINIMUM"
def _doc_hash(text: str) -> str:
return hashlib.sha256(text.encode("utf-8", "ignore")).hexdigest()[:20]
def _ckey(dh: str, cid: str, idx: int, crit: str) -> str:
ch = hashlib.sha256(crit.encode("utf-8", "ignore")).hexdigest()[:12]
return f"{dh}|{cid}#{idx}|{PROMPT_VERSION}|{ch}"
def _cache_get(key: str) -> Optional[bool]:
try:
with sqlite3.connect(_CACHE_DB) as c:
c.execute("create table if not exists judge(k text primary key, met int)")
row = c.execute("select met from judge where k=?", (key,)).fetchone()
return None if row is None else bool(row[0])
except Exception:
return None
def _cache_put(key: str, met: bool) -> None:
try:
with sqlite3.connect(_CACHE_DB) as c:
c.execute("create table if not exists judge(k text primary key, met int)")
c.execute("insert or replace into judge values(?,?)", (key, int(met)))
except Exception as e:
logger.warning("tiered judge cache put: %s", e)
async def prepare_doc(text: str) -> dict[str, Any]:
"""Doc EINMAL pro Scan einbetten. Liefert {hash, chunk_vecs}. Bei Embedding-
Ausfall: chunk_vecs=None → EMBEDDING-Kriterien werden UNBESTIMMT (Fallback)."""
ctx: dict[str, Any] = {"hash": _doc_hash(text or ""), "chunk_vecs": None}
if not text or len(text) < 100:
return ctx
try:
from compliance.services.mc_embedding_matcher import DIM, _chunk_text, _embed_texts
vecs = await asyncio.wait_for(_embed_texts(_chunk_text(text)), timeout=90.0)
ctx["chunk_vecs"] = [v for v in vecs if v and len(v) == DIM]
except (Exception, asyncio.TimeoutError) as e:
logger.warning("tiered prepare_doc embedding inaktiv: %s", e)
return ctx
async def _embed_present(crits: list[str], ctx: dict, thr: float) -> dict[str, Optional[bool]]:
cvecs = ctx.get("chunk_vecs")
if not cvecs:
return {c: None for c in crits}
try:
from compliance.services.mc_embedding_matcher import DIM, _cosine, _embed_texts
pv = await _embed_texts(crits)
out: dict[str, Optional[bool]] = {}
for crit, v in zip(crits, pv):
if not v or len(v) != DIM:
out[crit] = None
else:
out[crit] = max((_cosine(v, cv) for cv in cvecs), default=0.0) >= thr
return out
except Exception as e:
logger.warning("tiered embed present: %s", e)
return {c: None for c in crits}
async def _llm_met(cid: str, idx: int, crit: str, doc, dh: str) -> Optional[bool]:
key = _ckey(dh, cid, idx, crit)
cached = _cache_get(key)
if cached is not None:
return cached
from compliance.services.checkers.router import build_spec, route_and_check
spec = build_spec(cid, {"verification_method": "CONTENT", "decision_method": "LLM"},
label=crit, criteria=[crit])
res = await route_and_check(spec, doc)
if res.present is None:
return None
_cache_put(key, bool(res.present))
return bool(res.present)
def _status(lm_vals: list[Optional[bool]]) -> str:
if not lm_vals:
return "ERFÜLLT" # kein gesetzliches Minimum → nie rot
if any(m is None for m in lm_vals):
return "UNBESTIMMT" # Aufrufer behält Legacy
n = sum(1 for m in lm_vals if m)
if n == len(lm_vals):
return "ERFÜLLT"
return "FEHLT" if n == 0 else "TEILWEISE"
async def evaluate_tiered(control_id: str, tiered_criteria: list[dict],
ctx: dict, doc) -> dict[str, Any]:
dh = ctx.get("hash") or _doc_hash(getattr(doc, "text", "") or "")
emb_texts = [c["criterion"] for c in (tiered_criteria or [])
if c.get("criterion")
and (c.get("decision_method") or "EMBEDDING").upper() != "LLM"]
emb_res = await _embed_present(emb_texts, ctx, _EMBED_THR) if emb_texts else {}
lm_vals: list[Optional[bool]] = []
recs: list[dict] = []
detail: list[dict] = []
for idx, c in enumerate(tiered_criteria or []):
crit = c.get("criterion") or ""
if not crit:
continue
tier = (c.get("compliance_tier") or "").upper()
if (c.get("decision_method") or "EMBEDDING").upper() == "LLM":
met = await _llm_met(control_id, idx, crit, doc, dh)
src = "haiku-cache"
else:
met = emb_res.get(crit)
src = "embedding"
detail.append({"criterion": crit, "tier": tier, "met": met, "source": src})
if tier == LM:
lm_vals.append(met)
elif met is False:
recs.append({"criterion": crit, "tier": tier or "OPTIONAL",
"legal_basis": c.get("legal_basis")})
return {"status": _status(lm_vals), "lm_met": sum(1 for m in lm_vals if m),
"lm_total": len(lm_vals), "recommendations": recs, "detail": detail}
async def fetch_tiered_criteria(cids: list[str], db_url: str = "") -> dict[str, list]:
"""tiered_criteria der angegebenen Controls aus canonical_controls laden.
Leeres Dict bei Fehler/keiner DB (Fallback: kein Tiering, Legacy trägt)."""
cids = [c for c in cids if c]
if not cids:
return {}
import json
dsn = db_url or os.getenv("DATABASE_URL") or os.getenv("COMPLIANCE_DATABASE_URL")
if not dsn:
return {}
try:
import asyncpg
conn = await asyncpg.connect(dsn)
rows = await conn.fetch(
"select control_id, generation_metadata->'tiered_criteria' tc "
"from compliance.canonical_controls "
"where control_id = any($1::text[]) "
"and generation_metadata ? 'tiered_criteria'", cids)
await conn.close()
except Exception as e:
logger.warning("fetch_tiered_criteria failed: %s", e)
return {}
out: dict[str, list] = {}
for r in rows:
tc = r["tc"]
tc = json.loads(tc) if isinstance(tc, str) else tc
if tc:
out[r["control_id"]] = tc
return out
@@ -129,41 +129,11 @@ async def run_v3_pipeline(
r["source"] = (r.get("source") or "") + "+embedding"
embedding_passes += 1
# Layer 3: getierte 3-Status-Auswertung (nur Controls mit tiered_criteria).
# Reproduzierbar: EMBEDDING-Präsenz (deterministisch) + GECACHTER Haiku-Judge
# nur für Sufficiency. UNBESTIMMT → Legacy-Pass bleibt. Gated + fail-safe.
tiered_evaluated = 0
try:
from compliance.services.checkers.base import DocContext
from ._tiered_eval import (
evaluate_tiered, fetch_tiered_criteria, prepare_doc,
)
result_cids = [r.get("control_id") for r in results if r.get("control_id")]
tiered_map = await fetch_tiered_criteria(result_cids, db_url)
if tiered_map:
ctx = await prepare_doc(text)
doc_ctx = DocContext(text=text)
for r in results:
tc = tiered_map.get(r.get("control_id"))
if not tc:
continue
ev = await evaluate_tiered(r["control_id"], tc, ctx, doc_ctx)
if ev["status"] == "UNBESTIMMT":
continue
r["compliance_status"] = ev["status"]
r["recommendations"] = ev["recommendations"]
r["tier_lm"] = f"{ev['lm_met']}/{ev['lm_total']}"
r["passed"] = ev["status"] == "ERFÜLLT"
tiered_evaluated += 1
except Exception as e:
logger.warning("dse tiered eval skipped: %s", e)
telemetry = {
"layer_0_field_hits": len(boost_field_ids),
"layer_0_field_ids": boost_field_ids,
"layer_1_pass": layer_1_pass,
"embedding_passes": embedding_passes,
"tiered_evaluated": tiered_evaluated,
"total_mcs": len(results),
"sector_dropped": drop_stats.get("sector_dropped", 0),
"offtopic_dropped": drop_stats.get("offtopic_dropped", 0),
@@ -1,51 +0,0 @@
"""Prüfer-Router: build_spec aus sensor_classification + method-agnostischer
Dispatch. CONTENT/LLM -> Haiku-Sufficiency-Tier (validiert), unbekannte
decision_methods -> fail-safe present=None."""
import pytest
from unittest.mock import AsyncMock, patch
from compliance.services.checkers.base import DocContext
from compliance.services.checkers.router import build_spec, route_and_check
_ANTHROPIC = "compliance.services.llm_cascade._call_anthropic"
def test_build_spec_content_llm_uses_haiku():
s = build_spec("X", {"verification_method": "CONTENT", "decision_method": "LLM"},
label="L", criteria=["a", "b"])
assert s.verification_method == "CONTENT" and s.decision_method == "LLM"
assert s.extra.get("judge") == "haiku"
assert s.paraphrases == ["a", "b"]
def test_build_spec_embedding_no_haiku():
s = build_spec("X", {"verification_method": "CONTENT", "decision_method": "EMBEDDING"})
assert s.extra.get("judge") is None
@pytest.mark.asyncio
async def test_route_unknown_decision_is_failsafe():
s = build_spec("X", {"verification_method": "BEHAVIOR", "decision_method": "PLAYWRIGHT"})
r = await route_and_check(s, DocContext(text="x" * 200))
assert r.present is None and "no_checker" in r.source
@pytest.mark.asyncio
async def test_route_content_llm_haiku_fehlt():
s = build_spec("X", {"verification_method": "CONTENT", "decision_method": "LLM"},
label="Speicherdauer", criteria=["Höchstdauer pro Kategorie"])
fake = AsyncMock(return_value='{"erfuellt": false, "confidence": 0.9, "begruendung": "fehlt"}')
with patch(_ANTHROPIC, new=fake):
r = await route_and_check(s, DocContext(text="Wir nutzen Cookies. " * 30))
assert r.present is False and r.source == "haiku"
assert fake.call_count >= 1
@pytest.mark.asyncio
async def test_route_content_llm_haiku_erfuellt():
s = build_spec("X", {"verification_method": "CONTENT", "decision_method": "LLM"},
label="L", criteria=["x"])
fake = AsyncMock(return_value='{"erfuellt": true, "confidence": 0.8}')
with patch(_ANTHROPIC, new=fake):
r = await route_and_check(s, DocContext(text="text " * 40))
assert r.present is True
@@ -1,42 +0,0 @@
"""Tests for the cookie-policy applicability gate: controls without a
COOKIE_POLICY artifact are routed out of the findings scan (not deleted),
and the gate is fail-safe (no DSN -> no filter)."""
import pytest
from compliance.services.specialist_agents.cookie_policy._classification_gate import (
apply_gate, load_cookie_gate,
)
def test_apply_gate_splits_kept_and_routed():
controls = [
{"control_id": "COOK-1", "title": "Kategorien"},
{"control_id": "TOM-1", "title": "Verschlüsselung"},
{"control_id": "BAN-1", "title": "Consent vor Setzen"},
]
gate = {
"TOM-1": {"obligation_type": "TECHNICAL", "check_intent": "DIRECT_TECHNICAL",
"applicable_artifacts": ["TOM", "AUDIT"]},
"BAN-1": {"obligation_type": "TECHNICAL", "check_intent": "DIRECT_TECHNICAL",
"applicable_artifacts": ["COOKIE_BANNER", "SYSTEMSCAN"]},
}
kept, routed = apply_gate(controls, gate)
assert [c["control_id"] for c in kept] == ["COOK-1"]
assert {c["control_id"] for c in routed} == {"TOM-1", "BAN-1"}
# routed entries carry title + classification metadata for downstream routing
tom = next(c for c in routed if c["control_id"] == "TOM-1")
assert tom["title"] == "Verschlüsselung"
assert tom["applicable_artifacts"] == ["TOM", "AUDIT"]
def test_apply_gate_empty_gate_keeps_all():
controls = [{"control_id": "A"}, {"control_id": "B"}]
kept, routed = apply_gate(controls, {})
assert len(kept) == 2 and routed == []
@pytest.mark.asyncio
async def test_load_cookie_gate_no_dsn_is_failsafe(monkeypatch):
monkeypatch.delenv("DATABASE_URL", raising=False)
monkeypatch.delenv("COMPLIANCE_DATABASE_URL", raising=False)
assert await load_cookie_gate("") == {}
@@ -1,68 +0,0 @@
"""Layer-3 cookie sufficiency-judge: only embedding/boost-RESCUED passes are
re-judged by Haiku; keyword passes are untouched; a FEHLT verdict un-passes."""
import pytest
from unittest.mock import AsyncMock, patch
from compliance.services.specialist_agents.cookie_policy._sufficiency_judge import (
judge_rescued,
)
_ANTHROPIC = "compliance.services.llm_cascade._call_anthropic"
_DOC = "Volltext der Cookie-Richtlinie mit ausreichend Inhalt. " * 4
def _r(cid, source, passed=True):
return {"control_id": cid, "source": source, "passed": passed,
"label": cid, "_pass_criteria": ["konkrete Angabe nötig"]}
@pytest.mark.asyncio
async def test_rescued_unpassed_when_judge_fehlt():
results = [_r("A", "keyword+embedding")]
fake = AsyncMock(return_value='{"erfuellt": false, "confidence": 0.9, "begruendung": "fehlt"}')
with patch(_ANTHROPIC, new=fake):
n = await judge_rescued(_DOC, results)
assert n == 1
assert results[0]["passed"] is False
assert "+llm_failed" in results[0]["source"]
@pytest.mark.asyncio
async def test_rescued_kept_when_judge_erfuellt():
results = [_r("A", "keyword+embedding")]
fake = AsyncMock(return_value='{"erfuellt": true, "confidence": 0.9}')
with patch(_ANTHROPIC, new=fake):
n = await judge_rescued(_DOC, results)
assert n == 0
assert results[0]["passed"] is True
@pytest.mark.asyncio
async def test_keyword_pass_not_judged():
"""Deterministisch (keyword) bestandene Controls werden NICHT befragt."""
results = [_r("A", "keyword")]
fake = AsyncMock(return_value='{"erfuellt": false}')
with patch(_ANTHROPIC, new=fake):
n = await judge_rescued(_DOC, results)
assert n == 0
assert results[0]["passed"] is True
assert fake.call_count == 0
@pytest.mark.asyncio
async def test_boost_rescue_is_judged():
results = [_r("A", "keyword+regex_boost")]
fake = AsyncMock(return_value='{"erfuellt": false}')
with patch(_ANTHROPIC, new=fake):
n = await judge_rescued(_DOC, results)
assert n == 1 and results[0]["passed"] is False
@pytest.mark.asyncio
async def test_failed_controls_ignored():
"""Nicht-bestandene (failed) Controls sind nicht Sache dieser Schicht."""
results = [_r("A", "keyword+embedding", passed=False)]
fake = AsyncMock(return_value='{"erfuellt": false}')
with patch(_ANTHROPIC, new=fake):
n = await judge_rescued(_DOC, results)
assert n == 0 and fake.call_count == 0
@@ -1,77 +0,0 @@
"""Regression tests for the OVH (gpt-oss-120b) tier of the LLM cascade.
gpt-oss-120b is a reasoning model: it spends output tokens on chain-of-thought
before the answer. Two bugs this pins:
1. A small max_tokens (deep_check passed 400) length-caps it mid-reasoning →
content=null → the tier silently returns nothing. _call_ovh must floor the
budget so reasoning + the JSON answer fit.
2. When length-capped, the JSON can land in reasoning_content, not content →
_call_ovh must fall back to reasoning_content.
"""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from compliance.services import llm_cascade
def _resp(data):
r = MagicMock()
r.raise_for_status = MagicMock()
r.json = MagicMock(return_value=data)
return r
def _client(resp):
inst = AsyncMock()
inst.post.return_value = resp
inst.__aenter__ = AsyncMock(return_value=inst)
inst.__aexit__ = AsyncMock(return_value=False)
return inst
class TestCallOvhReasoning:
@pytest.mark.asyncio
async def test_reasoning_content_used_when_content_null(self, monkeypatch):
monkeypatch.setenv("OVH_LLM_URL", "https://llm.example.com")
monkeypatch.setenv("OVH_LLM_MODEL", "gpt-oss-120b")
monkeypatch.setenv("OVH_LLM_KEY", "k")
resp = _resp({"choices": [{"message": {
"content": None,
"reasoning_content": '{"erfuellt": true, "confidence": 0.9}'}}]})
with patch("httpx.AsyncClient", return_value=_client(resp)):
out = await llm_cascade._call_ovh("sys", "user", max_tokens=400)
assert '"erfuellt": true' in out
@pytest.mark.asyncio
async def test_small_budget_is_floored(self, monkeypatch):
monkeypatch.setenv("OVH_LLM_URL", "https://llm.example.com")
monkeypatch.setenv("OVH_LLM_MODEL", "gpt-oss-120b")
inst = _client(_resp({"choices": [{"message": {"content": "{}"}}]}))
with patch("httpx.AsyncClient", return_value=inst):
await llm_cascade._call_ovh("sys", "user", max_tokens=400)
assert inst.post.call_args.kwargs["json"]["max_tokens"] >= 2000
@pytest.mark.asyncio
async def test_large_budget_is_preserved(self, monkeypatch):
monkeypatch.setenv("OVH_LLM_URL", "https://llm.example.com")
monkeypatch.setenv("OVH_LLM_MODEL", "gpt-oss-120b")
inst = _client(_resp({"choices": [{"message": {"content": "{}"}}]}))
with patch("httpx.AsyncClient", return_value=inst):
await llm_cascade._call_ovh("sys", "user", max_tokens=6000)
assert inst.post.call_args.kwargs["json"]["max_tokens"] == 6000
@pytest.mark.asyncio
async def test_content_preferred_when_present(self, monkeypatch):
monkeypatch.setenv("OVH_LLM_URL", "https://llm.example.com")
monkeypatch.setenv("OVH_LLM_MODEL", "gpt-oss-120b")
resp = _resp({"choices": [{"message": {
"content": '{"erfuellt": false}', "reasoning_content": "noise"}}]})
with patch("httpx.AsyncClient", return_value=_client(resp)):
out = await llm_cascade._call_ovh("sys", "user")
assert out == '{"erfuellt": false}'
@pytest.mark.asyncio
async def test_unconfigured_returns_empty(self, monkeypatch):
monkeypatch.delenv("OVH_LLM_URL", raising=False)
monkeypatch.delenv("OVH_LLM_MODEL", raising=False)
assert await llm_cascade._call_ovh("sys", "user") == ""
@@ -1,102 +0,0 @@
"""Unit-Tests für die getierte 3-Status-Auswertung (_tiered_eval).
Deckt ab: Status-Logik (inkl. kein-LM → ERFÜLLT, UNBESTIMMT bei nicht bewertbar),
Empfehlungs-Sammlung, EMBEDDING/LLM-Routing (gemockt) und den Reproduzierbarkeits-
Cache. Embedding/LLM werden gemockt — kein Netzwerk."""
import asyncio
from compliance.services.specialist_agents.dse import _tiered_eval as te
# ---- reine Status-Logik -------------------------------------------------
def test_status_no_lm_is_erfuellt():
assert te._status([]) == "ERFÜLLT"
def test_status_all_met_erfuellt():
assert te._status([True, True]) == "ERFÜLLT"
def test_status_none_met_fehlt():
assert te._status([False, False]) == "FEHLT"
def test_status_partial_teilweise():
assert te._status([True, False]) == "TEILWEISE"
def test_status_any_none_unbestimmt():
assert te._status([True, None]) == "UNBESTIMMT"
# ---- evaluate_tiered (Embedding/LLM gemockt) ----------------------------
def _crit(text, tier, dm="EMBEDDING"):
return {"criterion": text, "compliance_tier": tier,
"decision_method": dm, "legal_basis": "x"}
class _Doc:
def __init__(self, text):
self.text = text
def test_evaluate_partial_with_recommendation(monkeypatch):
crits = [_crit("Zwecke genannt", "LEGAL_MINIMUM"),
_crit("Speicherdauer genannt", "LEGAL_MINIMUM"),
_crit("tabellarisch ausgewiesen", "BEST_PRACTICE")]
async def fake_embed(texts, ctx, thr):
return {"Zwecke genannt": True, "Speicherdauer genannt": False,
"tabellarisch ausgewiesen": False}
monkeypatch.setattr(te, "_embed_present", fake_embed)
out = asyncio.run(te.evaluate_tiered("C1", crits, {"hash": "h"}, _Doc("x" * 200)))
assert out["status"] == "TEILWEISE"
assert out["lm_met"] == 1 and out["lm_total"] == 2
assert len(out["recommendations"]) == 1
assert out["recommendations"][0]["tier"] == "BEST_PRACTICE"
def test_evaluate_no_lm_is_erfuellt_with_recs(monkeypatch):
crits = [_crit("Bildsymbole", "OPTIONAL"), _crit("Legende", "OPTIONAL")]
async def fake_embed(texts, ctx, thr):
return {t: False for t in texts}
monkeypatch.setattr(te, "_embed_present", fake_embed)
out = asyncio.run(te.evaluate_tiered("C2", crits, {"hash": "h"}, _Doc("x" * 200)))
assert out["status"] == "ERFÜLLT"
assert out["lm_total"] == 0
assert len(out["recommendations"]) == 2
def test_evaluate_llm_criterion_routed(monkeypatch):
crits = [_crit("Speicherdauer hinreichend nachvollziehbar", "LEGAL_MINIMUM", dm="LLM")]
async def fake_llm(cid, idx, crit, doc, dh):
return True
monkeypatch.setattr(te, "_llm_met", fake_llm)
out = asyncio.run(te.evaluate_tiered("C3", crits, {"hash": "h"}, _Doc("x" * 200)))
assert out["status"] == "ERFÜLLT" and out["lm_total"] == 1
def test_evaluate_unbestimmt_when_embed_unavailable(monkeypatch):
crits = [_crit("Zwecke genannt", "LEGAL_MINIMUM")]
async def fake_embed(texts, ctx, thr):
return {t: None for t in texts} # Embedding-Service down
monkeypatch.setattr(te, "_embed_present", fake_embed)
out = asyncio.run(te.evaluate_tiered("C4", crits, {"hash": "h"}, _Doc("x" * 200)))
assert out["status"] == "UNBESTIMMT"
# ---- Reproduzierbarkeits-Cache -----------------------------------------
def test_cache_roundtrip(monkeypatch, tmp_path):
monkeypatch.setattr(te, "_CACHE_DB", str(tmp_path / "cache.db"))
assert te._cache_get("k1") is None
te._cache_put("k1", True)
te._cache_put("k2", False)
assert te._cache_get("k1") is True
assert te._cache_get("k2") is False
@@ -1,155 +0,0 @@
# Kriterien-Meta-Modell & Compliance-Tier-Architektur
> **Status: EINGEFROREN 2026-06-22.** Änderungen an diesem Modell sind
> Architekturentscheidungen und erfordern eine bewusste Freigabe (DB-Owner /
> Produktverantwortung). Verwandt: [`platform_checker_matrix.md`](platform_checker_matrix.md),
> [`verification_method.md`](verification_method.md), [`platform_validation_v1.md`](platform_validation_v1.md).
## 1. Motivation
Die Kalibrierung der vier Website-Compliance-Module deckte vier **verschiedene**
dominante Fehlerursachen auf:
| Modul | Dominanter Hebel |
|-------|------------------|
| Cookie-Policy | Sufficiency (Judge) |
| Impressum | Scope / Routing |
| AGB | Decision-Method / Routing |
| DSE | **Überladene Controls + Vermischung „gesetzliches Minimum vs. Best Practice"** |
Die DSE-Untersuchung (Adjudikation von 13 Judge↔GT-Disagreements) ergab: **85 % der
Restfehler sind Katalog-Defekte, 15 % Prüfer.** Der größte Einzeldefekt: ein Control
bündelt mehrere Anforderungen **unterschiedlicher Verbindlichkeit** und wird nur dann
als ERFÜLLT gewertet, wenn *alle* erfüllt sind. Folge: gesetzlich konforme Dokumente
werden als „FEHLT" gemeldet, weil eine Best-Practice-Empfehlung fehlt.
Dieses Modell behebt das **im Katalog** — ohne den Prüfer zu ändern und ohne Controls
physisch aufzuspalten.
## 2. Datenmodell
Ein Control bleibt **stabil** (UUID, Citations, GT-Historie, Kalibrierung,
Statistiken). Seine `pass_criteria` werden von einer Stringliste zu **atomaren,
getypten Kriterien-Objekten**:
```
Control (stabile control_uuid — NICHT splitten)
└─ criteria: Criterion[]
Criterion
├─ criterion (Text der Einzelanforderung)
├─ legal_basis (z. B. "Art. 13(1)(c) DSGVO")
├─ verification_method (Achse 1 — WAS wird geprüft)
├─ decision_method (Achse 2 — WIE wird entschieden)
├─ compliance_tier (Achse 3 — WIE VERBINDLICH)
└─ weight (reserviert für Reifegrad, s. §6 — heute NICHT gating)
```
**Speicherort:** `canonical_controls.generation_metadata->'tiered_criteria'` (jsonb).
**Keine Schema-Änderung.** Kein physischer Control-Split (Variante A wurde verworfen:
neue UUIDs → Verlust von Benchmarks/Kalibrierung/Citation/GT = Migrationsprojekt).
## 3. Die drei Achsen
Jedes Kriterium trägt drei **unabhängige** Klassifikationen:
1. **`verification_method`** — artefakt-abhängig: CONTENT · FIELD · REFERENCE ·
BEHAVIOR · PRESENTATION · PROCESS · TECHNICAL · CONTRACTUAL. Siehe
[`verification_method.md`](verification_method.md).
2. **`decision_method`** — welcher Prüfer: REGEX · EMBEDDING · LLM · LINK_RESOLVER ·
PLAYWRIGHT · AUDIT · SCANNER. Siehe [`platform_checker_matrix.md`](platform_checker_matrix.md).
3. **`compliance_tier`** *(neu, dieses Dokument)* — Verbindlichkeit:
- **`LEGAL_MINIMUM`** — gesetzlich erforderlich. Beeinflusst den Compliance-Status.
- **`BEST_PRACTICE`** — empfehlenswert, gesetzlich nicht erforderlich. Erscheint als
Empfehlung. Beeinflusst den Status **nie**.
- **`OPTIONAL`** — Komfort/Detailtiefe. Empfehlung. Beeinflusst den Status **nie**.
Achse 1 + 2 sind primär **per Kriterium** (atomar); ein Control kann Kriterien
verschiedener Methoden mischen.
## 4. Status-Berechnung (3 Zustände) — Gating NUR auf LEGAL_MINIMUM
Sei `LM` die Menge der `LEGAL_MINIMUM`-Kriterien eines Controls und `met(LM)` die
erfüllten darunter:
```
ERFÜLLT := |LM| > 0 und met(LM) == |LM| (alle Pflicht-Kriterien erfüllt)
TEILWEISE := 0 < met(LM) < |LM| (mind. eines erfüllt, mind. eines fehlt)
FEHLT := |LM| > 0 und met(LM) == 0 (kein Pflicht-Kriterium erfüllt)
```
`BEST_PRACTICE`/`OPTIONAL`-Kriterien gehen **nicht** in diese Berechnung ein. Sie
werden separat als Empfehlungen ausgewiesen (§5, Ebene 2).
> **Invariante:** Ein erfülltes gesetzliches Minimum darf NIE durch fehlende
> Best-Practice-/Optional-Kriterien auf FEHLT/Rot gezogen werden.
## 5. Reporting — drei Ebenen
| Ebene | Inhalt | Quelle |
|-------|--------|--------|
| **1 — Compliance-Status (rechtlich)** | ERFÜLLT / TEILWEISE / FEHLT | NUR `LEGAL_MINIMUM` |
| **2 — Optimierungspotenzial** | „Empfehlungen: N · Best-Practice-Abdeckung X %" | `BEST_PRACTICE` + `OPTIONAL` |
| **3 — Risiko-Reifegrad** *(optional, später)* | „Reifegrad Y %" für CRA/NIS2/ISO 27001/TOM | gewichtet, s. §6 |
**Anti-Pattern (verboten):** kein „Compliance-Score = 72 %", wenn alle gesetzlichen
Anforderungen erfüllt sind. Das erzeugt „welche 28 % fehlen?" → „eigentlich keine
Pflicht" → der Score wird wertlos.
### Farb-Semantik (Bedeutung, nicht Wertung)
- **Grün** = gesetzliche Anforderungen erfüllt (Pflicht erfüllt)
- **Blau** = empfohlene Verbesserungen vorhanden (Optimierung möglich)
- **Rot** = gesetzliche Anforderungen fehlen (Pflichtverletzung)
`TEILWEISE` ist visuell ein eigener Zustand (z. B. Gelb/Amber): Pflicht teilweise
erfüllt. Verbindet sich mit der BreakPilot-Tonalität (kein Panik-Rot) und dem
3-Tier-Obligation-Modell (Pflicht/Empfehlung/Kann).
## 6. `weight`
Wird heute **gespeichert, aber nicht für das Gating verwendet** (bewusste
Entscheidung: Gewichte erzeugen sofort „warum 0.3 und nicht 0.4?"-Diskussionen). Es
ist die Reserve für **Ebene 3 (Reifegrad)**: später lässt sich daraus ein gewichteter
Best-Practice-/Reifegrad-Prozentwert berechnen. Richtwerte: LEGAL_MINIMUM 1.0 ·
BEST_PRACTICE ~0.3 · OPTIONAL ~0.1.
## 7. compliance_tier ist eine PLATTFORM-Achse
Nicht nur ein DSE-Fix. Dasselbe Muster tritt überall auf — DSE (Minimum vs. BP),
Cookie (Offenlegung vs. Transparenz), Impressum (Pflicht- vs. Komfortfelder), AGB
(erforderlich vs. empfehlenswert) und perspektivisch CRA/NIS2/Maschinenverordnung.
Ein einzelnes Kriterium trägt überall `compliance_tier`; die Plattform wertet
**Compliance / Empfehlungen / Reifegrad** regulierungsunabhängig aus.
## 8. Validierungsnachweis (Pilot, 2026-06-22)
Geschrieben auf macmini (`generation_metadata.tiered_criteria`, prod-guarded), gemessen
gegen Opus-GT (ikea/ob/teamviewer):
- **5 Pilot-Controls** (SEC-7285-A03, SEC-3257-A01, Portabilitäts-Cluster
DATA-1613/DATA-2552/COMP-2087): alle **6 Disagreement-Fälle** (vormals falsch-FEHLT)
wandern zu **ERFÜLLT + Empfehlungen**; echte Lücken bleiben korrekt FEHLT — ohne
Prüfer-Änderung.
- **TEILWEISE-Validierung** (DATA-1445-A02, SEC-4752-A02): der 3. Status tritt real auf
(1 ERFÜLLT / 5 TEILWEISE), Splitter durchgängig „Speicherdauer pro Zweck"
(Art. 13(2)(a)).
- Lehre: selbst Pilot-Kriterien können Minimum + Best-Practice vermischen
(„Speicherdauer *pro Zweck*"). Die LM/BP-Linie ist eine **Produktpolitik-Entscheidung
(Mensch)**, kein NLP-Problem. Das Modell ist korrekt; die Kriterien-Schärfe ist
Kurationsarbeit.
## 9. Invarianten (nicht verletzen)
1. Control-UUID bleibt stabil — **kein** physischer Split.
2. Status (Grün/Gelb/Rot) hängt **ausschließlich** an `LEGAL_MINIMUM`.
3. `BEST_PRACTICE`/`OPTIONAL` erzeugen Empfehlungen, **nie** einen FEHLT-Status.
4. Kein Prozent-Compliance-Score, wenn alle gesetzlichen Anforderungen erfüllt sind.
5. Speicherung in `generation_metadata` (jsonb) — keine Schema-Migration.
## 10. Rollout (nach diesem Freeze)
1. **1015** der schlimmsten überladenen DSE-Controls tiern (nicht alle 49 auf einmal).
2. 3-Status-Logik in die Live-DSE-Engine verdrahten (heute nur Mess-Harness).
3. Benchmark erneut: FP / FN / Precision / Recall + Status-Verteilung.
4. Erst bei stabilem Effekt: Rollout auf alle 49 überladenen Controls.