Compare commits

..

13 Commits

Author SHA1 Message Date
Benjamin Admin d5925e57af feat(ai-sdk): pin accepted proposer decisions into the GT gate (P3)
CI / detect-changes (push) Successful in 12s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Successful in 9s
CI / validate-canonical-controls (push) Successful in 8s
CI / loc-budget (push) Successful in 21s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Successful in 59s
CI / iace-gt-coverage (push) Successful in 19s
CI / test-python-backend (push) Has been skipped
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
When a human accepts a proposer proposal, an AcceptedPin records a machine-scoped
invariant — a pattern MUST fire (coverage/vocab→tag) or must NOT fire
(dedup/framing) — that a test re-checks on every run. This makes the library's
growth COMPOUND into the gate instead of eroding it: a change that re-introduces
a dropped duplicate, un-gates a foreign pattern, or removes a coverage hazard
breaks a pin and fails CI. One boolean covers all four proposal types.

Seeded testdata/accepted_pins_warewashing.json with the accepted P1 supersessions
(HP016/HP018/HP013 must NOT fire; their clean equivalents HP2201/HP144 must fire).
TestWarewashing_AcceptedPins re-checks 5/5 against the live engine output;
GenerateDedupPin turns an accepted dedup verdict into its pin.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-30 09:42:31 +02:00
Benjamin Admin 1877829b1d Merge remote-tracking branch 'gitea/main' into reconcile-dev
CI / detect-changes (push) Successful in 10s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Successful in 8s
CI / validate-canonical-controls (push) Successful in 5s
CI / loc-budget (push) Successful in 22s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 3m3s
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 26s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
2026-06-30 09:04:58 +02:00
Benjamin_Boenisch 866889b453 Merge pull request 'feat(ucca): Multi-Regulation-Retrieval (Cross-Regulation-Fragen)' (#43) from fix/multi-regulation-retrieval into main
CI / detect-changes (push) Successful in 12s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Successful in 7s
CI / validate-canonical-controls (push) Successful in 6s
CI / loc-budget (push) Successful in 21s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Successful in 1m0s
CI / iace-gt-coverage (push) Successful in 20s
CI / test-python-backend (push) Has been skipped
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
2026-06-30 06:46:21 +00:00
Benjamin Admin 9760dca443 feat(ucca): Multi-Regulation-Retrieval für Cross-Regulation-Fragen
CI / detect-changes (pull_request) Successful in 10s
CI / branch-name (pull_request) Successful in 1s
CI / guardrail-integrity (pull_request) Successful in 8s
CI / secret-scan (pull_request) Successful in 9s
CI / dep-audit (pull_request) Failing after 56s
CI / sbom-scan (pull_request) Failing after 58s
CI / build-sha-integrity (pull_request) Successful in 9s
CI / validate-canonical-controls (pull_request) Successful in 7s
CI / loc-budget (pull_request) Successful in 24s
CI / go-lint (pull_request) Successful in 54s
CI / python-lint (pull_request) Failing after 16s
CI / nodejs-lint (pull_request) Failing after 1m9s
CI / nodejs-build (pull_request) Successful in 3m6s
CI / test-go (pull_request) Successful in 1m3s
CI / iace-gt-coverage (pull_request) Successful in 19s
CI / test-python-backend (pull_request) Successful in 26s
CI / test-python-document-crawler (pull_request) Successful in 15s
CI / test-python-dsms-gateway (pull_request) Successful in 12s
Nennt eine Query EXPLIZIT >=2 Regelwerke ("Wie greifen CRA und Maschinen-
verordnung ineinander?"), retrievt searchInternal pro Regelwerk separat
(regulation_code/regulation_id-Filter) und merged — damit BEIDE Domänen im
Prompt landen statt nur der keyword-dominanten. Generisch (Query->Regelwerke,
KEINE doc-spezifische Logik), gegated auf >=2 erkannte Regelwerke; sonst
unveränderter Single-Domain-Pfad.

Behebt GQ-0070: vorher CRA x8 / null MaschVO -> Modell halluzinierte
MaschVO=2019/2144 + falsche "CRA ausgenommen"-Konklusion. Nachher CRA + MaschVO
im Prompt -> korrekt "beide gleichzeitig anwendbar" + Art. 20(9)
Konformitätsvermutung, gegroundet.

Validierung (Build-Collection, echtes SearchCollection):
- Unit: detectRegulations-Scoping (>=2 -> multi, 1/0 -> single)
- 5 Cross-Reg-Fälle (0070 + DSGVO+TDDDG/CRA+NIS2/DORA+NIS2/AI Act+DSGVO):
  beide Regelwerke in Top-8
- CB-100 Freeze-Regression: NUR GQ-0070 + GQ-0095 geändert (beide echte
  Cross-Reg, beide verbessert), 98/100 byte-identisch
- 10 Hard Cases: 9 Single-Domain unverändert, 0070 behält CRA Rang 1

Filter erweitert auf regulation_id UND regulation_code (rückwärtskompatibel,
aktiviert die re-ingestierte Build-Collection).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-30 08:18:06 +02:00
Benjamin_Boenisch e5e7b825af Merge pull request 'fix(ucca): Guidance-Intent für direkt benannte WP/GL-Dokumente' (#42) from fix/legal-rag-guidance-intent into main
CI / branch-name (push) Has been skipped
CI / detect-changes (push) Successful in 7s
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Successful in 6s
CI / validate-canonical-controls (push) Successful in 5s
CI / loc-budget (push) Successful in 20s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Successful in 1m0s
CI / iace-gt-coverage (push) Successful in 17s
CI / test-python-backend (push) Has been skipped
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
2026-06-29 18:42:27 +00:00
pilotadmin f0da86ca19 Merge pull request 'feat(onboarding): advisor responsiveness — moving headline + auto-recompute' (#54) from feat/advisor-ux-responsiveness into main 2026-06-28 19:31:20 +02:00
Benjamin Admin 867f8c3854 feat(onboarding): make the advisor visibly responsive — headline leads with the moving number + auto-recompute
Testing surfaced that toggling certifications appeared to "do nothing": the headline led with the TOTAL
requirement count (constant per target, e.g. 17 for CRA), and the page only recomputed on an explicit
button click. Both fixed:
  - engine.py headline now leads with the number that actually moves: "11 von 17 Anforderungen offen ·
    6 wahrscheinlich (Zertifikate) · 5 zu klären" (was "17 Anforderungen erkannt · …"). Keeps the
    "automatisch erkannt (Intake)" substring.
  - frontend auto-recomputes on certifications / target / scanner-signal change (no button needed).

Now ISO27001 alone -> "13 von 17 offen · 4 wahrscheinlich"; + ISO9001+TISAX+IEC62443 -> "11 von 17 offen ·
6 wahrscheinlich". (Domain truth stays visible: CRA's product-cyber gaps barely move with management-system
certs.) 28 onboarding+transition tests pass, check-loc 0.
2026-06-28 19:31:15 +02:00
pilotadmin 26a8518107 Merge pull request 'feat(onboarding): surface curated expert text + human labels' (#53) from feat/advisor-human-text into main 2026-06-28 18:47:07 +02:00
Benjamin Admin 807a7002b2 feat(onboarding): surface curated expert text + human capability labels (advisor was showing snake_case)
The advisor was structurally correct but unusable: every question showed a snake_case capability id plus a
single generic fallback reason ("Keine Anhaltspunkte im Unternehmensprofil — klären"). The expert text
already EXISTED in the transition patterns (why_asked / reviewable_claim) — the pipeline just dropped it.

  - transition_reasoning: TargetRequirement gains `rationale`; assess_transition uses it as the request
    reason when present, else the generic fallback (additive, backward-compatible for all consumers).
  - onboarding_service._target carries the pattern's why_asked (delta) and reviewable_claim (likely_covered)
    into the requirement rationale -> the question's `why`.
  - knowledge/onboarding/capability_labels.yaml: curated DE labels (id -> human), reusable across targets;
    labels_for() + response.capability_labels expose them; the frontend renders label || prettified id.

Now ISO27001->TISAX reads "Auftragsverarbeitung (Art. 28 DSGVO) — If a TISAX data label is in scope, you
must show Art. 28 GDPR processing-on-behalf controls; ISO 27001 does not establish these." instead of
"data_protection_processing_on_behalf — klären". why_asked text is still EN (existing knowledge; translation
is curation). 34 onboarding+transition tests pass, mypy --strict clean (13 modules), check-loc 0.
2026-06-28 18:46:56 +02:00
pilotadmin 5beb5a319a Merge pull request 'feat(admin): ETO / Onboarding-Advisor test page' (#52) from feat/onboarding-advisor-frontend into main 2026-06-28 17:12:44 +02:00
Benjamin Admin 239702fdca feat(admin): ETO / Onboarding-Advisor test page (thin operator surface over the advisor endpoint)
A focused client page at /sdk/onboarding-advisor that exercises POST /api/compliance/onboarding/
advisor-start through the existing compliance proxy: pick certifications + target + scanner findings
(observation / partial / requirement) and render the result — headline, silent-intake summary,
auto-detected (green), indications (amber), next-best questions with WHY, inferred (Welt-1) vs rejected
assumptions, capability delta, evidence requests, completeness. NOT the regulation gap engine
(/sdk/gap-analysis is a different flow). No new backend; calls only the existing endpoint. 195 lines.
2026-06-28 17:12:40 +02:00
pilotadmin d1a5fc7205 Merge pull request 'feat(onboarding): Observation Log — append-only JSONL calibration store (59b/c)' (#51) from feat/observation-log into main 2026-06-28 16:29:58 +02:00
Benjamin Admin 7df15010ff feat(onboarding): Observation Log — append-only JSONL calibration store (Task 59b/c v1)
Per the user's decision (2026-06-28): observations are CALIBRATION data for the knowledge base, NOT
business data and NOT product-DB data. So they live with the other versioned knowledge artifacts as an
append-only JSONL log under knowledge/observations/ — NO migration, NO DB. (A real persistence layer is
only warranted once thousands of onboardings exist; not before.)

  - ObservationRecord = Observation + log metadata (observation_id, timestamp [caller-stamped, no hidden
    clock], customer_archetype [anonymised — NEVER a real name], evidence, provenance, knowledge_version).
  - append_observation() writes one JSON line; append-only, lines are never rewritten. A later review is a
    NEW line with the same observation_id; load_observations(reconcile=True) keeps the latest per id.
  - load_observations() reads a single .jsonl or a directory of monthly .jsonl files.
  - aggregate_by_hypothesis() (59c) -> per-hypothesis distribution + confidence, COMPUTED from the log
    (computed-not-stored); the review gate (reviewed-only) is enforced in empirical_distribution/confidence.
  - review_queue() -> the unreviewed worklist. Observation -> Review -> Accepted -> recompute, never
    Observation -> confidence++. Nothing is ever written back to a hypothesis.

You can `rm` the log and recompute, `git diff` it over months, or rebuild confidence under a new policy —
fully consistent with computed-not-stored and the product/knowledge data separation.

Non-runtime (module + tests only, no endpoint) -> origin/main, NO dev deploy. 5 new tests (append-only,
review supersession, review-gate statistics, queue, monthly-file load); 27 onboarding tests pass, mypy
--strict clean (9 modules), check-loc 0. 59d (surface computed confidence at runtime) stays a later step.
2026-06-28 16:29:54 +02:00
19 changed files with 891 additions and 58 deletions
@@ -0,0 +1,200 @@
'use client'
// ETO / Onboarding-Advisor — thin operator surface over POST /api/compliance/onboarding/advisor-start.
// Certifications + target + scanner findings -> Silent Pass -> Advisor. NOT the regulation gap engine
// (/sdk/gap-analysis is a different flow: product -> applicable regulations). This tests the cert->delta
// case: "TISAX/ISO27001 -> CRA, what is auto-detected, what stays an open question?". No new backend.
import React, { useEffect, useState } from 'react'
const CERTS = ['ISO27001', 'TISAX', 'ISO9001', 'IEC62443', 'ISO13485', 'ISO14001', 'ASPICE', 'IATF16949']
// label -> {signal_id, source_type} — demonstrates all three signal KINDS (observation / partial / requirement)
const FINDINGS: Array<{ label: string; signal_id: string; source_type: string; kind: string }> = [
{ label: 'SBOM im Repo (CycloneDX/SPDX)', signal_id: 'cyclonedx_found', source_type: 'repository', kind: 'observation' },
{ label: 'security.txt / CVD-Policy veröffentlicht', signal_id: 'security_txt', source_type: 'website', kind: 'observation' },
{ label: 'Signierte Releases', signal_id: 'signed_releases', source_type: 'repository', kind: 'observation' },
{ label: 'Produkt-Risikobewertung (Dokument)', signal_id: 'risk_assessment_pdf', source_type: 'document', kind: 'observation' },
{ label: 'CI-Pipeline vorhanden (nur Indikation)', signal_id: 'github_actions_ci', source_type: 'repository', kind: 'partial' },
{ label: 'Cloud-/vernetztes Produkt', signal_id: 'cloud_hosted', source_type: 'product', kind: 'observation' },
{ label: 'Ausschreibung FORDERT SBOM (Requirement)', signal_id: 'requires_sbom', source_type: 'tender', kind: 'requirement' },
{ label: 'OEM FORDERT PSIRT (Requirement)', signal_id: 'supplier_requires_psirt', source_type: 'oem', kind: 'requirement' },
]
interface Question { capability_id: string; question_intent: string; why: string; information_value: number; priority: string }
interface Inferred { certification: string; capabilities: string[]; statement: string }
interface Rejected { certification?: string; statement: string; reason: string }
interface Measure { capability_id: string; leverage: number; closes: string[] }
interface AdvisorResponse {
silent_intake_summary: string; headline: string; auto_detected: string[]; indications: string[]
inferred_assumptions: Inferred[]; rejected_assumptions: Rejected[]; top_5_questions: Question[]
capability_delta: string[]; top_measures: Measure[]; evidence_requests: string[]
unsupported_domains: string[]; completeness_summary: string; capability_labels: Record<string, string>
}
const PROXY = '/api/sdk/v1/compliance/onboarding'
function Chips({ items, tone }: { items: string[]; tone: string }) {
if (!items.length) return <span className="text-gray-400 text-sm"></span>
return (
<div className="flex flex-wrap gap-2">
{items.map(c => <span key={c} className={`px-2.5 py-1 rounded-full text-xs font-medium ${tone}`}>{c}</span>)}
</div>
)
}
function Section({ title, hint, children }: { title: string; hint?: string; children: React.ReactNode }) {
return (
<div className="bg-white rounded-xl border border-gray-200 p-5">
<h3 className="font-semibold text-gray-900">{title}</h3>
{hint && <p className="text-xs text-gray-500 mt-0.5 mb-2">{hint}</p>}
<div className="mt-2">{children}</div>
</div>
)
}
export default function OnboardingAdvisorPage() {
const [targets, setTargets] = useState<string[]>([])
const [company, setCompany] = useState('Beispiel Maschinenbau')
const [industry, setIndustry] = useState('machine_builder')
const [certs, setCerts] = useState<string[]>(['ISO27001', 'ISO9001'])
const [target, setTarget] = useState('CRA')
const [findings, setFindings] = useState<string[]>(['cyclonedx_found', 'github_actions_ci', 'requires_sbom'])
const [knownEvidence, setKnownEvidence] = useState('CE-Prozess')
const [result, setResult] = useState<AdvisorResponse | null>(null)
const [loading, setLoading] = useState(false)
const [error, setError] = useState('')
useEffect(() => {
fetch(`${PROXY}/targets`).then(r => r.json()).then(d => {
if (Array.isArray(d.targets)) { setTargets(d.targets); if (!d.targets.includes('CRA') && d.targets[0]) setTarget(d.targets[0]) }
}).catch(() => {})
}, [])
const toggle = (list: string[], set: (v: string[]) => void, v: string) =>
set(list.includes(v) ? list.filter(x => x !== v) : [...list, v])
const lbl = (id: string) => result?.capability_labels?.[id] || id.replace(/_/g, ' ')
const run = async () => {
setLoading(true); setError(''); setResult(null)
try {
const scanner_findings = FINDINGS.filter(f => findings.includes(f.signal_id))
.map(f => ({ signal_id: f.signal_id, source_type: f.source_type }))
const res = await fetch(`${PROXY}/advisor-start`, {
method: 'POST', headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
company, industry, products: [], markets: ['EU'], certifications: certs,
known_evidence: knownEvidence ? knownEvidence.split(',').map(s => s.trim()).filter(Boolean) : [],
target, scanner_findings,
}),
})
if (!res.ok) throw new Error(await res.text())
setResult(await res.json())
} catch (e) {
setError(e instanceof Error ? e.message : 'Advisor fehlgeschlagen')
} finally { setLoading(false) }
}
// auto-recompute when certifications / target / scanner signals change (no button click needed)
useEffect(() => { if (certs.length) run() }, [certs, target, findings]) // eslint-disable-line react-hooks/exhaustive-deps
return (
<div className="min-h-screen bg-gray-50 py-8">
<div className="max-w-5xl mx-auto px-4">
<h1 className="text-3xl font-bold text-gray-900">ETO / Onboarding-Advisor</h1>
<p className="text-gray-600 mt-2 mb-6">
Zertifikate + Ziel + Scanner-Signale Silent Pass Capability-Delta + nächste beste Fragen.
Welt-1: ein Zertifikat <em>legt nahe</em>, beweist nichts (Verifikation erforderlich).
</p>
<div className="grid md:grid-cols-2 gap-4 mb-6">
<Section title="Unternehmen & Ziel">
<label className="block text-sm text-gray-600">Unternehmen
<input value={company} onChange={e => setCompany(e.target.value)} className="mt-1 w-full border rounded-lg px-3 py-2" /></label>
<label className="block text-sm text-gray-600 mt-3">Branche
<input value={industry} onChange={e => setIndustry(e.target.value)} className="mt-1 w-full border rounded-lg px-3 py-2" /></label>
<label className="block text-sm text-gray-600 mt-3">Ziel
<select value={target} onChange={e => setTarget(e.target.value)} className="mt-1 w-full border rounded-lg px-3 py-2">
{(targets.length ? targets : ['CRA']).map(t => <option key={t} value={t}>{t}</option>)}
</select></label>
<label className="block text-sm text-gray-600 mt-3">Vorhandene Nachweise (kommagetrennt)
<input value={knownEvidence} onChange={e => setKnownEvidence(e.target.value)} className="mt-1 w-full border rounded-lg px-3 py-2" /></label>
</Section>
<Section title="Zertifizierungen">
<div className="flex flex-wrap gap-2">
{CERTS.map(c => (
<button key={c} onClick={() => toggle(certs, setCerts, c)}
className={`px-3 py-1.5 rounded-lg text-sm border ${certs.includes(c) ? 'bg-blue-600 text-white border-blue-600' : 'bg-white text-gray-700 border-gray-300'}`}>{c}</button>
))}
</div>
</Section>
</div>
<Section title="Scanner-Signale (Silent Pass)" hint="observation = gesehen · partial = Indikation · requirement = gefordert (≠ vorhanden)">
<div className="grid sm:grid-cols-2 gap-2">
{FINDINGS.map(f => (
<label key={f.signal_id} className="flex items-center gap-2 text-sm text-gray-700">
<input type="checkbox" checked={findings.includes(f.signal_id)} onChange={() => toggle(findings, setFindings, f.signal_id)} />
<span>{f.label}</span>
<span className={`ml-auto text-[10px] px-1.5 py-0.5 rounded ${f.kind === 'requirement' ? 'bg-purple-100 text-purple-700' : f.kind === 'partial' ? 'bg-amber-100 text-amber-700' : 'bg-emerald-100 text-emerald-700'}`}>{f.kind}</span>
</label>
))}
</div>
</Section>
<button onClick={run} disabled={loading || !certs.length}
className="mt-6 w-full py-3 bg-blue-600 text-white rounded-xl font-medium hover:bg-blue-700 disabled:opacity-50">
{loading ? 'Analysiere…' : 'Advisor starten'}
</button>
{error && <div className="mt-6 bg-red-50 border border-red-200 rounded-lg p-4 text-red-700 text-sm whitespace-pre-wrap">{error}</div>}
{result && (
<div className="mt-8 space-y-4">
<div className="bg-blue-600 text-white rounded-xl p-5">
<div className="text-lg font-semibold">{result.headline}</div>
<div className="text-blue-100 text-sm mt-1">{result.silent_intake_summary}</div>
</div>
<div className="grid md:grid-cols-2 gap-4">
<Section title="Automatisch erkannt" hint="konkrete Artefakte nicht mehr gefragt"><Chips items={result.auto_detected.map(lbl)} tone="bg-emerald-100 text-emerald-800" /></Section>
<Section title="Indikationen" hint="erhöht Annahmestärke trotzdem gefragt"><Chips items={result.indications.map(lbl)} tone="bg-amber-100 text-amber-800" /></Section>
</div>
<Section title="Nächste beste Fragen" hint="max 5, jede erklärt sich selbst">
{result.top_5_questions.length ? (
<ol className="space-y-3">
{result.top_5_questions.map((q, i) => (
<li key={q.capability_id} className="border-l-2 border-blue-300 pl-3">
<div className="font-medium text-gray-900">{i + 1}. {lbl(q.capability_id)}</div>
<div className="text-sm text-gray-600">{q.why}</div>
</li>
))}
</ol>
) : <span className="text-gray-400 text-sm"></span>}
</Section>
<div className="grid md:grid-cols-2 gap-4">
<Section title="Wahrscheinlich abgedeckt (Welt-1)" hint="Zertifikat legt nahe Verifikation erforderlich">
{result.inferred_assumptions.length ? result.inferred_assumptions.map(a => (
<div key={a.certification} className="mb-2"><span className="font-medium">{a.certification}</span>: {a.capabilities.map(lbl).join(', ')}</div>
)) : <span className="text-gray-400 text-sm"></span>}
</Section>
<Section title="Nicht relevant" hint="relevance(evidence, target) = 0">
{result.rejected_assumptions.length ? result.rejected_assumptions.map((a, i) => (
<div key={i} className="mb-1 text-sm text-gray-700">{a.statement}</div>
)) : <span className="text-gray-400 text-sm"></span>}
</Section>
</div>
<div className="grid md:grid-cols-2 gap-4">
<Section title="Offene Lücken (Delta)"><Chips items={result.capability_delta.map(lbl)} tone="bg-gray-100 text-gray-700" /></Section>
<Section title="Geforderte Nachweise"><Chips items={result.evidence_requests} tone="bg-gray-100 text-gray-700" /></Section>
</div>
<Section title="Vollständigkeit" hint={result.unsupported_domains.length ? `nicht abgedeckt: ${result.unsupported_domains.join(', ')}` : undefined}>
<span className="text-sm text-gray-700">{result.completeness_summary || '—'}</span>
</Section>
</div>
)}
</div>
</div>
)
}
@@ -0,0 +1,73 @@
package iace
// P3: pin accepted proposer decisions into the GT gate.
//
// When a human accepts a proposal from the offline proposer (a dedup
// supersession, a foreign-framing gate, a vocab→tag mapping, a coverage hazard),
// they record an AcceptedPin. A pin is a tiny, machine-scoped invariant — "this
// pattern MUST (or must NOT) fire for this machine" — that a test re-checks on
// every run. This is what makes the library's growth COMPOUND into the gate
// instead of silently eroding it: a future change that re-introduces a dropped
// duplicate, un-gates a foreign pattern, or removes a coverage hazard breaks the
// pin and fails CI.
//
// A single boolean covers all four proposal types:
// - dedup supersession accepted → DropPattern MustFire=false
// - foreign-framing gate accepted → foreign pattern MustFire=false
// - vocab→tag / coverage hazard accepted → the enabled pattern MustFire=true
// AcceptedPin is one regression invariant for an accepted proposal.
type AcceptedPin struct {
Pattern string `json:"pattern"`
MustFire bool `json:"must_fire"`
Reason string `json:"reason"`
FromProposal string `json:"from_proposal,omitempty"`
}
// PinSet is the accepted-pin registry for one machine (testdata/accepted_pins_*.json).
type PinSet struct {
Machine string `json:"machine"`
Pins []AcceptedPin `json:"pins"`
}
// PinResult is the verdict for one pin against an engine run.
type PinResult struct {
Pin AcceptedPin
OK bool
Detail string
}
// VerifyPins checks every pin against the set of pattern IDs the engine actually
// fired for the machine. A pin holds iff the pattern's presence equals MustFire.
func VerifyPins(pins []AcceptedPin, firedPatternIDs []string) []PinResult {
fired := make(map[string]bool, len(firedPatternIDs))
for _, id := range firedPatternIDs {
fired[id] = true
}
out := make([]PinResult, 0, len(pins))
for _, p := range pins {
got := fired[p.Pattern]
ok := got == p.MustFire
detail := "ok"
if !ok {
if p.MustFire {
detail = "expected to fire but did NOT — coverage/mapping regressed"
} else {
detail = "expected to be suppressed but FIRED — gate/supersession regressed"
}
}
out = append(out, PinResult{Pin: p, OK: ok, Detail: detail})
}
return out
}
// GenerateDedupPin turns an accepted (verdict=duplicate) dedup candidate into the
// pin that protects the supersession: the dropped pattern must no longer fire.
func GenerateDedupPin(c DedupCandidate) AcceptedPin {
return AcceptedPin{
Pattern: c.DropPattern,
MustFire: false,
Reason: "accepted duplicate of " + c.KeepPattern + " (" + c.Category + ")",
FromProposal: "dedup " + c.DropPattern + " -> " + c.KeepPattern,
}
}
@@ -0,0 +1,63 @@
package iace
import (
"encoding/json"
"os"
"path/filepath"
"testing"
)
func TestVerifyPins(t *testing.T) {
pins := []AcceptedPin{
{Pattern: "HPa", MustFire: true},
{Pattern: "HPb", MustFire: false},
}
res := VerifyPins(pins, []string{"HPa", "HPb"})
if !res[0].OK {
t.Errorf("HPa must_fire=true and it fired -> should be OK")
}
if res[1].OK {
t.Errorf("HPb must_fire=false but it fired -> should be VIOLATED")
}
res2 := VerifyPins(pins, []string{})
if res2[0].OK || !res2[1].OK {
t.Errorf("expected HPa violated + HPb ok, got %+v", res2)
}
}
func TestGenerateDedupPin(t *testing.T) {
pin := GenerateDedupPin(DedupCandidate{KeepPattern: "HP144", DropPattern: "HP013", Category: "electrical_hazard"})
if pin.Pattern != "HP013" || pin.MustFire {
t.Fatalf("want pin {HP013, must_fire=false}, got %+v", pin)
}
}
// TestWarewashing_AcceptedPins re-checks every accepted P1 supersession against the
// live warewashing engine output. A future change that un-suppresses HP013/016/018
// or drops HP2201/HP144 breaks a pin here — the gate compounds, not erodes.
func TestWarewashing_AcceptedPins(t *testing.T) {
raw, err := os.ReadFile(filepath.Join("testdata", "accepted_pins_warewashing.json"))
if err != nil {
t.Fatalf("read pins: %v", err)
}
var ps PinSet
if err := json.Unmarshal(raw, &ps); err != nil {
t.Fatalf("parse pins: %v", err)
}
_, _, kept := warewashingEngineOutput()
firedIDs := make([]string, 0, len(kept))
for _, pm := range kept {
firedIDs = append(firedIDs, pm.PatternID)
}
ok := 0
for _, r := range VerifyPins(ps.Pins, firedIDs) {
if r.OK {
ok++
continue
}
t.Errorf("PIN VIOLATED: %s (must_fire=%v) — %s [%s]", r.Pin.Pattern, r.Pin.MustFire, r.Detail, r.Pin.Reason)
}
t.Logf("accepted pins for %q: %d/%d hold", ps.Machine, ok, len(ps.Pins))
}
@@ -0,0 +1,10 @@
{
"machine": "Gewerbliche Untertisch-Geschirrspuelmaschine (vernetzt)",
"pins": [
{"pattern": "HP016", "must_fire": false, "reason": "generic hot-surface (Formwerkzeuge/Auspuffleitung framing) superseded by HP2201", "from_proposal": "P1 thermal supersession"},
{"pattern": "HP018", "must_fire": false, "reason": "actuator-burn superseded by HP2201", "from_proposal": "P1 thermal supersession"},
{"pattern": "HP013", "must_fire": false, "reason": "stored-energy Batterie/USV framing superseded by HP144", "from_proposal": "P1 stored-energy supersession"},
{"pattern": "HP2201", "must_fire": true, "reason": "warewashing hot-surface (Boiler/Tank/Spuelkammer) must remain — it is the clean equivalent that replaces HP016/HP018", "from_proposal": "P1 thermal supersession"},
{"pattern": "HP144", "must_fire": true, "reason": "residual-voltage (Frequenzumrichter/Zwischenkreis) must remain — clean equivalent that replaces HP013", "from_proposal": "P1 stored-energy supersession"}
]
}
@@ -78,6 +78,19 @@ func (c *LegalRAGClient) Search(ctx context.Context, query string, regulationIDs
// If hybrid search is enabled, it uses the Qdrant Query API with RRF fusion
// (dense + full-text). Falls back to dense-only /points/search on failure.
func (c *LegalRAGClient) searchInternal(ctx context.Context, collection string, query string, regulationIDs []string, topK int) ([]LegalSearchResult, error) {
// Multi-Regulation-Retrieval: nennt die Query EXPLIZIT >=2 Regelwerke (z.B. "CRA und
// Maschinenverordnung"), wird pro Regelwerk separat retrieved + gemergt, damit BEIDE
// Domaenen im Prompt landen statt nur der keyword-dominanten. Generisch (Query->Regelwerke,
// keine doc-spezifische Logik); nur wenn der Caller nicht ohnehin schon auf Regulierungen
// filtert. Best-effort: leeres/fehlerhaftes Multi-Ergebnis faellt auf die Standardsuche zurueck.
if len(regulationIDs) == 0 {
if regs := detectRegulations(query); len(regs) >= 2 {
if mr, mErr := c.searchMultiRegulation(ctx, collection, query, regs, topK); mErr == nil && len(mr) > 0 {
return mr, nil
}
}
}
embedding, err := c.generateEmbedding(ctx, query)
if err != nil {
return nil, fmt.Errorf("failed to generate embedding: %w", err)
@@ -123,43 +136,7 @@ func (c *LegalRAGClient) searchInternal(ctx context.Context, collection string,
hits = c.expandViaGraph(ctx, collection, hits)
}
results := make([]LegalSearchResult, len(hits))
for i, hit := range hits {
// Legal-Metadaten nach rag_reingest_spec.md §2: bevorzugt die normalisierten Felder
// (article_label/regulation_code/article/...); Fallback auf alte Feldnamen, solange der
// Korpus noch nicht re-ingestiert ist (regulation_id, section="§ 38").
regCode := getString(hit.Payload, "regulation_code")
if regCode == "" {
regCode = getString(hit.Payload, "regulation_id")
}
article := getString(hit.Payload, "article")
if article == "" {
article = getString(hit.Payload, "section")
}
results[i] = LegalSearchResult{
Text: getString(hit.Payload, "chunk_text"),
RegulationCode: regCode,
RegulationName: getString(hit.Payload, "regulation_name_de"),
RegulationShort: getString(hit.Payload, "regulation_short"),
Category: getString(hit.Payload, "category"),
ArticleLabel: getString(hit.Payload, "article_label"),
Article: article,
Paragraph: getString(hit.Payload, "paragraph"),
Sub: getString(hit.Payload, "sub"),
IsRecital: getBool(hit.Payload, "is_recital"),
CitationStyle: getString(hit.Payload, "citation_style"),
Pages: getIntSlice(hit.Payload, "pages"),
SourceURL: getString(hit.Payload, "source"),
Score: hit.Score,
AuthorityWeight: getInt(hit.Payload, "authority_weight"),
SourceClass: getString(hit.Payload, "source_class"),
Jurisdiction: getString(hit.Payload, "jurisdiction"),
CitationUnit: getString(hit.Payload, "citation_unit"),
ReferencesOut: getStringSlice(hit.Payload, "references_out"),
ReferencesIn: getStringSlice(hit.Payload, "references_in"),
Superseded: getString(hit.Payload, "status") == "superseded",
}
}
results := hitsToResults(hits)
// Authority-aware Re-Ranking: bindendes Recht der passenden Jurisdiktion/Domaene nach
// oben, Guidance/Fremdrecht/Off-Domain runter (nichts wird geloescht). Reihenfolge only,
@@ -122,12 +122,14 @@ func (c *LegalRAGClient) searchHybrid(ctx context.Context, collection string, em
}
if len(regulationIDs) > 0 {
conditions := make([]qdrantCondition, len(regulationIDs))
for i, regID := range regulationIDs {
conditions[i] = qdrantCondition{
Key: "regulation_id",
Match: qdrantMatch{Value: regID},
}
// Match BOTH the legacy field (regulation_id) and the normalized field
// (regulation_code) so per-regulation filtering works on the re-ingested corpus too.
conditions := make([]qdrantCondition, 0, len(regulationIDs)*2)
for _, regID := range regulationIDs {
conditions = append(conditions,
qdrantCondition{Key: "regulation_id", Match: qdrantMatch{Value: regID}},
qdrantCondition{Key: "regulation_code", Match: qdrantMatch{Value: regID}},
)
}
queryReq.Filter = &qdrantFilter{Should: conditions}
}
@@ -175,12 +177,14 @@ func (c *LegalRAGClient) searchDense(ctx context.Context, collection string, emb
}
if len(regulationIDs) > 0 {
conditions := make([]qdrantCondition, len(regulationIDs))
for i, regID := range regulationIDs {
conditions[i] = qdrantCondition{
Key: "regulation_id",
Match: qdrantMatch{Value: regID},
}
// Match BOTH the legacy field (regulation_id) and the normalized field
// (regulation_code) so per-regulation filtering works on the re-ingested corpus too.
conditions := make([]qdrantCondition, 0, len(regulationIDs)*2)
for _, regID := range regulationIDs {
conditions = append(conditions,
qdrantCondition{Key: "regulation_id", Match: qdrantMatch{Value: regID}},
qdrantCondition{Key: "regulation_code", Match: qdrantMatch{Value: regID}},
)
}
searchReq.Filter = &qdrantFilter{Should: conditions}
}
@@ -0,0 +1,143 @@
package ucca
import (
"context"
"fmt"
"strings"
)
// multiRegMinPerRegulation is the minimum number of hits fetched per named regulation, so
// each domain is fairly represented even when topK/len(regs) would be tiny.
const multiRegMinPerRegulation = 3
// regulationCatalog maps a regulation to (a) the aliases that signal it is EXPLICITLY named
// in a query and (b) the regulation_code/regulation_id values used to filter the corpus.
// Deterministic + generic: a query naming >=2 regulations triggers per-regulation retrieval
// so a cross-regulation question returns every named domain — NOT a doc-specific rule.
var regulationCatalog = []struct {
Canonical string
Aliases []string
CodeValues []string
}{
{"CRA", []string{"cra", "cyber resilience"}, []string{"CRA"}},
{"MaschVO", []string{"maschinenverordnung", "maschvo", "machinery regulation"}, []string{"MASCHVO", "MaschVO"}},
{"NIS2", []string{"nis2", "nis-2", "nis 2"}, []string{"NIS2"}},
{"DORA", []string{"dora"}, []string{"DORA"}},
{"Data Act", []string{"data act", "datengesetz"}, []string{"DATA ACT", "DataAct"}},
{"AI Act", []string{"ai act", "ki-vo", "ki-verordnung", "ai-verordnung"}, []string{"AI ACT", "AIAct"}},
{"DSGVO", []string{"dsgvo", "gdpr"}, []string{"DSGVO"}},
{"TDDDG", []string{"tdddg"}, []string{"TDDDG"}},
{"BDSG", []string{"bdsg"}, []string{"BDSG"}},
}
type detectedRegulation struct {
Canonical string
CodeValues []string
}
// detectRegulations returns the DISTINCT regulations explicitly named in the query. >=2 of
// them is the trigger for multi-regulation retrieval. Pure + deterministic, no LLM.
func detectRegulations(query string) []detectedRegulation {
q := strings.ToLower(query)
var out []detectedRegulation
for _, r := range regulationCatalog {
for _, a := range r.Aliases {
if strings.Contains(q, a) {
out = append(out, detectedRegulation{Canonical: r.Canonical, CodeValues: r.CodeValues})
break
}
}
}
return out
}
func hitID(h qdrantSearchHit) string { return fmt.Sprintf("%v", h.ID) }
// searchMultiRegulation retrieves each explicitly-named regulation SEPARATELY (per-regulation
// filter) and merges, so a cross-regulation query ("Wie greifen CRA und MaschVO ineinander?")
// returns BOTH domains in the prompt instead of only the keyword-dominant one. Generic over any
// named pair (DSGVO+TDDDG, CRA+NIS2, DORA+NIS2, AI Act+DSGVO, ...). The merged pool is
// authority-reranked once. Pure pool-construction; topK contract preserved.
func (c *LegalRAGClient) searchMultiRegulation(ctx context.Context, collection, query string, regs []detectedRegulation, topK int) ([]LegalSearchResult, error) {
embedding, err := c.generateEmbedding(ctx, query)
if err != nil {
return nil, fmt.Errorf("failed to generate embedding: %w", err)
}
perReg := topK / len(regs)
if perReg < multiRegMinPerRegulation {
perReg = multiRegMinPerRegulation
}
var merged []qdrantSearchHit
seen := make(map[string]bool)
for _, r := range regs {
var hits []qdrantSearchHit
if c.hybridEnabled {
if h, hErr := c.searchHybrid(ctx, collection, embedding, r.CodeValues, perReg); hErr == nil {
hits = h
}
}
if hits == nil {
if h, dErr := c.searchDense(ctx, collection, embedding, r.CodeValues, perReg); dErr == nil {
hits = h
}
}
for _, h := range hits {
id := hitID(h)
if seen[id] {
continue
}
seen[id] = true
merged = append(merged, h)
}
}
if len(merged) == 0 {
return nil, fmt.Errorf("multi-regulation search returned no hits")
}
results := hitsToResults(merged)
results = rerankByAuthority(query, results)
if topK > 0 && len(results) > topK {
results = results[:topK]
}
return results, nil
}
// hitsToResults maps raw Qdrant hits to LegalSearchResult, preferring the normalized payload
// fields (regulation_code/article_label/...) with fallback to the legacy names (regulation_id,
// section) while the corpus is mid-re-ingestion. Shared by searchInternal + searchMultiRegulation.
func hitsToResults(hits []qdrantSearchHit) []LegalSearchResult {
results := make([]LegalSearchResult, len(hits))
for i, hit := range hits {
regCode := getString(hit.Payload, "regulation_code")
if regCode == "" {
regCode = getString(hit.Payload, "regulation_id")
}
article := getString(hit.Payload, "article")
if article == "" {
article = getString(hit.Payload, "section")
}
results[i] = LegalSearchResult{
Text: getString(hit.Payload, "chunk_text"),
RegulationCode: regCode,
RegulationName: getString(hit.Payload, "regulation_name_de"),
RegulationShort: getString(hit.Payload, "regulation_short"),
Category: getString(hit.Payload, "category"),
ArticleLabel: getString(hit.Payload, "article_label"),
Article: article,
Paragraph: getString(hit.Payload, "paragraph"),
Sub: getString(hit.Payload, "sub"),
IsRecital: getBool(hit.Payload, "is_recital"),
CitationStyle: getString(hit.Payload, "citation_style"),
Pages: getIntSlice(hit.Payload, "pages"),
SourceURL: getString(hit.Payload, "source"),
Score: hit.Score,
AuthorityWeight: getInt(hit.Payload, "authority_weight"),
SourceClass: getString(hit.Payload, "source_class"),
Jurisdiction: getString(hit.Payload, "jurisdiction"),
CitationUnit: getString(hit.Payload, "citation_unit"),
ReferencesOut: getStringSlice(hit.Payload, "references_out"),
ReferencesIn: getStringSlice(hit.Payload, "references_in"),
Superseded: getString(hit.Payload, "status") == "superseded",
}
}
return results
}
@@ -0,0 +1,92 @@
package ucca
import (
"context"
"fmt"
"os"
"strings"
"testing"
)
// TestDetectRegulations is a pure unit test of the multi-regulation TRIGGER (no Qdrant):
// only an explicit naming of >=2 regulations enables multi-regulation retrieval. A single
// named regulation, or a topical question that doesn't name one, stays single-domain.
func TestDetectRegulations(t *testing.T) {
cases := []struct {
q string
want int
}{
{"Welche neun Kriterien nennt WP248 fuer ein voraussichtlich hohes Risiko?", 0},
{"Welche Anforderungen gelten fuer wesentliche Veraenderungen einer Maschine?", 0}, // "Maschine" != MaschVO
{"Benoetigt eine SPS ohne Netzwerkanschluss eine CRA-Bewertung?", 1}, // 1 -> single
{"Wie greifen CRA und Maschinenverordnung bei einer vernetzten Maschine ineinander?", 2},
{"Wie greifen DSGVO und TDDDG bei der Nutzung von Cookies ineinander?", 2},
{"Wie verhalten sich DORA und NIS2 fuer ein Finanzunternehmen?", 2},
{"Wie greifen AI Act und DSGVO bei einem KI-System ineinander?", 2},
}
for _, c := range cases {
if got := len(detectRegulations(c.q)); got != c.want {
t.Errorf("detectRegulations(%q) = %d, want %d", c.q, got, c.want)
}
}
}
// TestMultiRegE2E (RUN_E2E=1) verifies against the build collection that an explicit
// cross-regulation query returns BOTH named domains in the top-K — the core acceptance
// gate for multi-regulation retrieval.
func TestMultiRegE2E(t *testing.T) {
if os.Getenv("RUN_E2E") != "1" {
t.Skip("set RUN_E2E=1 + QDRANT_URL/OLLAMA_URL")
}
c := NewLegalRAGClient()
coll := os.Getenv("E2E_COLLECTION")
if coll == "" {
coll = "bp_compliance_kb_2026_1_build"
}
cases := []struct {
id string
q string
want []string
}{
{"GQ-0070 CRA+MaschVO", "Wie greifen CRA und Maschinenverordnung bei einer vernetzten Maschine ineinander?", []string{"CRA", "MASCH"}},
{"DSGVO+TDDDG", "Wie greifen DSGVO und TDDDG bei der Nutzung von Cookies und Tracking-Technologien ineinander?", []string{"DSGVO", "TDDDG"}},
{"CRA+NIS2", "Wie verhalten sich CRA und NIS2 bei einem vernetzten Produkt eines wichtigen Unternehmens zueinander?", []string{"CRA", "NIS2"}},
{"DORA+NIS2", "Wie greifen DORA und NIS2 bei einem Finanzunternehmen ineinander?", []string{"DORA", "NIS2"}},
{"AI Act+DSGVO", "Wie greifen AI Act und DSGVO bei einem KI-System ineinander, das personenbezogene Daten verarbeitet?", []string{"AI ACT", "DSGVO"}},
}
for _, tc := range cases {
res, err := c.SearchCollection(context.Background(), coll, tc.q, nil, 8)
if err != nil {
t.Fatalf("%s: %v", tc.id, err)
}
present := map[string]bool{}
for _, r := range res {
present[strings.ToUpper(r.RegulationCode)] = true
}
ok := true
for _, w := range tc.want {
found := false
for cd := range present {
if strings.Contains(cd, w) {
found = true
break
}
}
if !found {
ok = false
}
}
codes := make([]string, 0, len(present))
for cd := range present {
codes = append(codes, cd)
}
status := "OK"
if !ok {
status = "FAIL"
}
fmt.Printf("%-22s want=%v present=%v %s\n", tc.id, tc.want, codes, status)
if !ok {
t.Errorf("%s: not all named regulations in top-8 (want %v, got %v)", tc.id, tc.want, codes)
}
}
}
@@ -8,7 +8,7 @@ This adds NO new reasoning logic. It exposes the already-built, tested orchestra
"""
import logging
from typing import List, Optional
from typing import Dict, List, Optional
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
@@ -20,7 +20,7 @@ from compliance.onboarding import (
ProducedSignal,
RejectedAssumption,
)
from compliance.services.onboarding_service import run_advisor, supported_targets
from compliance.services.onboarding_service import labels_for, run_advisor, supported_targets
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/onboarding", tags=["onboarding"])
@@ -50,6 +50,7 @@ class AdvisorResponse(BaseModel):
evidence_requests: List[str] = Field(default_factory=list)
unsupported_domains: List[str] = Field(default_factory=list)
completeness_summary: str = ""
capability_labels: Dict[str, str] = Field(default_factory=dict) # capability_id -> human label (DE)
@router.get("/targets")
@@ -65,10 +66,17 @@ def advisor_start_endpoint(req: OnboardingAdvisorRequest) -> AdvisorResponse:
company=req.company, certifications=req.certifications, target=req.target,
signals=req.scanner_findings, known_evidence=req.known_evidence,
products=req.products, markets=req.markets, industry=req.industry or "")
surfaced = [
*result.auto_detected, *result.indications, *result.capability_delta,
*(q.capability_id for q in result.next_best_questions),
*(c for a in result.inferred_assumptions for c in a.capabilities),
*(m.capability_id for m in result.top_measures),
]
return AdvisorResponse(
silent_intake_summary=si_summary, headline=result.headline, auto_detected=result.auto_detected,
indications=result.indications,
inferred_assumptions=result.inferred_assumptions, rejected_assumptions=result.rejected_assumptions,
top_5_questions=result.next_best_questions, capability_delta=result.capability_delta,
top_measures=result.top_measures, evidence_requests=result.evidence_requests,
unsupported_domains=result.unsupported_domains, completeness_summary=result.completeness_summary)
unsupported_domains=result.unsupported_domains, completeness_summary=result.completeness_summary,
capability_labels=labels_for(surfaced))
@@ -21,6 +21,14 @@ from .observations import (
empirical_distribution,
reviewed,
)
from .observation_log import (
HypothesisStats,
ObservationRecord,
aggregate_by_hypothesis,
append_observation,
load_observations,
review_queue,
)
from .signals import (
ProducedSignal,
SignalVocabularyEntry,
@@ -69,4 +77,10 @@ __all__ = [
"ProducedSignal",
"SignalVocabularyEntry",
"normalize_signals",
"ObservationRecord",
"HypothesisStats",
"append_observation",
"load_observations",
"aggregate_by_hypothesis",
"review_queue",
]
@@ -143,8 +143,8 @@ def advisor_start(
next_best_questions=next_q, capability_delta=delta, top_measures=measures,
evidence_requests=evidence, unsupported_domains=unsupported,
completeness_summary=rep.completeness_summary,
headline="%d Anforderungen erkannt · %d automatisch erkannt (Intake) · %d wahrscheinlich (Zertifikate) · %d zu klären"
% (len(assess.coverage), len(auto_detected), len(probably), len(next_q)))
headline="%d von %d Anforderungen offen · %d automatisch erkannt (Intake) · %d wahrscheinlich (Zertifikate) · %d zu klären"
% (len(delta), len(assess.coverage), len(auto_detected), len(probably), len(next_q)))
def apply_answer(known_capabilities: Sequence[str], capability_id: str, answer: str) -> List[str]:
@@ -0,0 +1,108 @@
"""Observation Log — append-only JSONL store for empirical calibration events (Task 59b v1).
Observations are NOT business data and NOT product-DB data — they are CALIBRATION events for the
knowledge base ("ISO27001 -> SDL confirmed", "TISAX -> supplier security refuted"). So they live with the
other versioned knowledge artifacts (hypotheses, transition patterns, vocabulary), NOT in the product
database: an append-only JSONL log under `knowledge/observations/`. NO migration, NO DB. The empirical
DISTRIBUTION and CONFIDENCE are COMPUTED from this log on demand (computed-not-stored) — a hypothesis is
NEVER auto-updated; only REVIEWED observations calibrate (the review gate, enforced in observations.py).
Append-only: each line is one ObservationRecord and lines are NEVER modified in place. A later review is
a NEW line with the same observation_id and reviewed=true; load_observations() reconciles to the latest
per id. You can `rm` the log and recompute, `git diff` it over months, or rebuild confidence under a new
policy. Anonymisation is MANDATORY: customer_archetype is a sector/cert archetype, NEVER a real company
name (this file is committed to git). Time is stamped by the CALLER (no hidden clock) for determinism.
I/O only at the append/load boundary; statistics are pure. Python 3.9 compatible.
"""
from __future__ import annotations
import json
import os
from typing import Dict, List, Optional, Sequence
from pydantic import BaseModel, Field
from .observations import Observation, empirical_confidence, empirical_distribution
_DEFAULT_LOG = os.path.join(
os.path.dirname(__file__), "..", "..", "knowledge", "observations", "observations.jsonl")
class ObservationRecord(Observation):
"""A persisted observation line: an Observation (with its review gate + observation_type) plus log
metadata. `observation_id` is stable — a review re-appends the SAME id with reviewed=true."""
observation_id: str # stable id; a review re-appends the same id
timestamp: str = "" # ISO 8601, stamped by the CALLER (no hidden clock)
customer_archetype: str = "" # sector/cert archetype — NEVER a real company name
evidence: str = "" # what backs the answer (reference, not the artifact)
provenance: str = "" # where the answer came from (audit trail)
knowledge_version: str = "" # hypotheses/vocabulary version observed under
class HypothesisStats(BaseModel):
"""Per-hypothesis empirical rollup — all COMPUTED from the log, nothing stored on the hypothesis."""
hypothesis_id: str
distribution: Dict[str, int] = Field(default_factory=dict) # reviewed counts per observation_type
confidence: Optional[float] = None # None until a for/against obs is reviewed
reviewed_count: int = 0
total_count: int = 0
def append_observation(record: ObservationRecord, path: str = _DEFAULT_LOG) -> None:
"""Append ONE record as a JSON line. Append-only — existing lines are never rewritten."""
os.makedirs(os.path.dirname(path), exist_ok=True)
line = json.dumps(record.model_dump(mode="json"), ensure_ascii=False, sort_keys=True)
with open(path, "a", encoding="utf-8") as fh:
fh.write(line + "\n")
def load_observations(path: str = _DEFAULT_LOG, reconcile: bool = True) -> List[ObservationRecord]:
"""Read all records — a single `.jsonl` file or a directory of monthly `.jsonl` files. With
reconcile, the LATEST record per observation_id wins (a later reviewed=true supersedes the original).
Returns deterministic order (by observation_id when reconciled, else append order)."""
files: List[str] = []
if os.path.isdir(path):
files = sorted(os.path.join(path, f) for f in os.listdir(path) if f.endswith(".jsonl"))
elif os.path.exists(path):
files = [path]
records: List[ObservationRecord] = []
for fpath in files:
with open(fpath, encoding="utf-8") as fh:
for raw in fh:
raw = raw.strip()
if raw:
records.append(ObservationRecord(**json.loads(raw)))
if not reconcile:
return records
latest: Dict[str, ObservationRecord] = {}
for r in records: # file/append order -> later lines win
latest[r.observation_id] = r
return [latest[k] for k in sorted(latest)]
def aggregate_by_hypothesis(records: Sequence[ObservationRecord]) -> List[HypothesisStats]:
"""Per-hypothesis distribution + confidence. The review gate applies inside empirical_distribution/
empirical_confidence (reviewed-only), so unreviewed observations are counted in total but never
calibrate. Deterministic order (by hypothesis id)."""
by_hyp: Dict[str, List[ObservationRecord]] = {}
for r in records:
by_hyp.setdefault(r.hypothesis_id, []).append(r)
out: List[HypothesisStats] = []
for hyp in sorted(by_hyp):
obs = by_hyp[hyp]
out.append(HypothesisStats(
hypothesis_id=hyp,
distribution=empirical_distribution(obs), # reviewed-only (the gate)
confidence=empirical_confidence(obs), # None until reviewed for/against
reviewed_count=sum(1 for o in obs if o.reviewed),
total_count=len(obs)))
return out
def review_queue(records: Sequence[ObservationRecord]) -> List[ObservationRecord]:
"""The reviewer's worklist: observations not yet reviewed. Calibration ignores these until a reviewer
accepts them (Observation -> Review -> Accepted -> Knowledge recomputed), never Observation -> conf++."""
return [r for r in records if not r.reviewed]
@@ -9,7 +9,7 @@ It adds NO new reasoning logic — it only exposes what exists. No DB, no persis
from __future__ import annotations
import os
from typing import Any, Dict, List, Sequence, Tuple
from typing import Any, Dict, Iterable, List, Sequence, Tuple
import yaml
@@ -37,6 +37,13 @@ def _load(*parts: str) -> Any:
_HYP_LIB = [CapabilityHypothesis(**h) for h in _load("certification_hypotheses", "hypotheses.yaml")["hypotheses"]]
_VOCAB = [SignalVocabularyEntry(**v) for v in _load("onboarding", "signal_vocabulary.yaml")["signals"]]
_SIGNAL_MAP = [SignalMapping(**m) for m in _load("onboarding", "intake_signal_map.yaml")["mappings"]]
_LABELS: Dict[str, str] = _load("onboarding", "capability_labels.yaml")["labels"]
def labels_for(capability_ids: Iterable[str]) -> Dict[str, str]:
"""Human labels (DE) for the given capability ids — presentation only. Ids without a curated label
are omitted (the frontend falls back to a prettified id). Deduped, deterministic."""
return {c: _LABELS[c] for c in dict.fromkeys(capability_ids) if c in _LABELS}
# target id -> transition pattern that defines its required capabilities (curated registry)
_TARGET_PATTERNS = {
@@ -53,9 +60,10 @@ def supported_targets() -> List[str]:
def _target(target_id: str) -> Tuple[List[TargetRequirement], Dict[str, List[str]]]:
pat = _load("transition_patterns", _TARGET_PATTERNS[target_id])
reqs = [TargetRequirement(capability_id=a["capability"]) for a in pat["likely_covered"]]
reqs = [TargetRequirement(capability_id=a["capability"], rationale=a.get("reviewable_claim", "")) for a in pat["likely_covered"]]
reqs += [TargetRequirement(capability_id=d["capability"], question_intent=d.get("needed_information", "verify_existence"),
expected_evidence=d.get("expected_evidence", [])) for d in pat["delta_requirements"]]
rationale=d.get("why_asked", ""), expected_evidence=d.get("expected_evidence", []))
for d in pat["delta_requirements"]]
covers = {d["capability"]: d.get("covers_targets", []) for d in pat["delta_requirements"]}
return reqs, covers
@@ -104,7 +104,8 @@ def assess_transition(
)
buckets[status].append(req.capability_id)
if status in _REQUESTABLE:
reason, prio = _REQUESTABLE[status]
default_reason, prio = _REQUESTABLE[status]
reason = req.rationale or default_reason # curated human text wins over the generic fallback
requests.append(
TransitionQuestionRequest(
capability_id=req.capability_id,
@@ -70,6 +70,7 @@ class TargetRequirement(BaseModel):
capability_id: str # MCAP-...
question_intent: str = "verify_existence" # passed through to the request, not rendered
rationale: str = "" # curated human text (e.g. why_asked / reviewable_claim) — surfaced as the request reason
expected_evidence: List[str] = Field(default_factory=list)
source_control_id: Optional[str] = None
supports_obligations: List[str] = Field(default_factory=list)
@@ -0,0 +1,2 @@
# Append-only observation log (Task 59b). Real lines (observations.jsonl / YYYY-MM.jsonl) are written at
# runtime via compliance/onboarding/observation_log.py. Anonymised archetypes only — NEVER real company names.
@@ -0,0 +1,45 @@
# Human-readable capability labels (DE) — presentation only, reusable across all targets.
# A capability id is the stable machine identity; this maps it to an expert-facing label for the UI.
# Curated knowledge (draft — to be corrected by the domain expert). Missing ids fall back to a
# prettified id in the frontend. NO real company names. Keep labels short + concrete.
labels:
# ── ISMS / ISO 27001 core ───────────────────────────────────────────────
information_security_management: "Informationssicherheits-Managementsystem (ISMS)"
access_control_and_authentication: "Zugriffskontrolle & Authentifizierung"
asset_and_configuration_management: "Asset- & Konfigurationsverwaltung"
cryptography: "Kryptographie / Verschlüsselung"
incident_management: "Security-Incident-Management"
security_awareness_training: "Security-Awareness-Schulungen"
supplier_security: "Lieferanten-Sicherheit"
security_logging_and_monitoring: "Security-Logging & Monitoring"
technical_vulnerability_management: "Technisches Schwachstellen-Management"
# ── TISAX / VDA-spezifisch ──────────────────────────────────────────────
prototype_protection: "Prototypenschutz (physisch & logisch)"
tisax_label_scope_selection: "TISAX-Label-/Scope-Festlegung"
tisax_assessment_via_enx: "TISAX-Assessment über die ENX-Plattform"
vda_isa_self_assessment: "VDA-ISA-Selbstauskunft"
data_protection_processing_on_behalf: "Auftragsverarbeitung (Art. 28 DSGVO)"
physical_security: "Physische Sicherheit / Zutrittskontrolle"
# ── QM / ISO 9001 ───────────────────────────────────────────────────────
document_and_change_control: "Dokumenten- & Änderungslenkung"
supplier_evaluation: "Lieferantenbewertung"
release_and_approval_process: "Freigabe- & Genehmigungsprozess"
ce_conformity_assessment_and_technical_documentation: "CE-Konformitätsbewertung & technische Dokumentation"
# ── CRA / Produkt-Cybersecurity ─────────────────────────────────────────
sbom_creation: "SBOM-Erstellung (Software-Stückliste)"
coordinated_vulnerability_disclosure: "Coordinated Vulnerability Disclosure (CVD)"
secure_development_lifecycle: "Sicherer Entwicklungslebenszyklus (SDLC)"
secure_signed_update_distribution: "Sichere, signierte Update-Verteilung"
security_update_support_period: "Sicherheits-Update-Supportzeitraum"
product_cyber_risk_assessment: "Produkt-Cyber-Risikobewertung"
exploited_vuln_and_incident_reporting: "Meldung ausgenutzter Schwachstellen & Vorfälle"
public_security_advisories: "Öffentliche Security Advisories"
cybersecurity_management_system: "Cybersecurity-Managementsystem (CSMS)"
# ── MaschinenVO / Safety ────────────────────────────────────────────────
machine_safety_risk_assessment: "Maschinen-Risikobeurteilung"
mechanical_safety_and_guards: "Mechanische Sicherheit & Schutzeinrichtungen"
operating_instructions_and_safety_information: "Betriebsanleitung & Sicherheitshinweise"
protection_against_corruption_of_safety_functions: "Schutz der Sicherheitsfunktionen vor Manipulation"
# ── Umwelt ──────────────────────────────────────────────────────────────
environmental_management_documentation: "Umweltmanagement-Dokumentation"
@@ -0,0 +1,73 @@
"""Observation Log — append-only JSONL store + computed statistics (Task 59b/c v1).
Pins the user's decision (2026-06-28): observations are CALIBRATION data, not product data -> an
append-only JSONL log under knowledge/observations/, NO DB, NO migration. Distribution and confidence are
COMPUTED from the log; only REVIEWED observations calibrate (review gate); a later review is a new line
that supersedes by observation_id. Nothing is ever written back to a hypothesis.
"""
from __future__ import annotations
from compliance.onboarding import (
ObservationRecord,
ObservationType,
aggregate_by_hypothesis,
append_observation,
load_observations,
review_queue,
)
def _rec(oid, hyp, otype, reviewed=False, **kw):
return ObservationRecord(
observation_id=oid, hypothesis_id=hyp, observation_type=otype, reviewed=reviewed,
timestamp="2026-07-01T00:00:00Z", customer_archetype="machine_builder+ISO27001", **kw)
def test_append_only_round_trip(tmp_path):
p = str(tmp_path / "obs.jsonl")
append_observation(_rec("o1", "HYP-secure_dev", ObservationType.CONFIRMED, reviewed=True), p)
append_observation(_rec("o2", "HYP-secure_dev", ObservationType.REFUTED, reviewed=True), p)
recs = load_observations(p)
assert {r.observation_id for r in recs} == {"o1", "o2"}
assert all(r.customer_archetype == "machine_builder+ISO27001" for r in recs) # anonymised archetype, not a name
def test_review_supersedes_by_id_append_only(tmp_path):
p = str(tmp_path / "obs.jsonl")
append_observation(_rec("o1", "HYP-x", ObservationType.CONFIRMED, reviewed=False), p) # raw answer
append_observation(_rec("o1", "HYP-x", ObservationType.CONFIRMED, reviewed=True,
reviewed_by="anna"), p) # later review event
assert len(load_observations(p, reconcile=False)) == 2 # both lines kept (append-only)
recs = load_observations(p) # reconciled
assert len(recs) == 1 and recs[0].reviewed and recs[0].reviewed_by == "anna"
def test_statistics_apply_the_review_gate(tmp_path):
p = str(tmp_path / "obs.jsonl")
append_observation(_rec("a", "HYP-sdl", ObservationType.CONFIRMED, reviewed=True), p)
append_observation(_rec("b", "HYP-sdl", ObservationType.CONFIRMED, reviewed=True), p)
append_observation(_rec("c", "HYP-sdl", ObservationType.REFUTED, reviewed=True), p)
append_observation(_rec("d", "HYP-sdl", ObservationType.CONFIRMED, reviewed=False), p) # unreviewed -> ignored
stats = {s.hypothesis_id: s for s in aggregate_by_hypothesis(load_observations(p))}
s = stats["HYP-sdl"]
assert s.total_count == 4 and s.reviewed_count == 3
assert s.distribution["confirmed"] == 2 and s.distribution["refuted"] == 1 # unreviewed one excluded
assert s.confidence == round(2 / 3, 2) # (2 + 0.5*0) / 3
def test_review_queue_lists_unreviewed(tmp_path):
p = str(tmp_path / "obs.jsonl")
append_observation(_rec("a", "HYP-y", ObservationType.CONFIRMED, reviewed=True), p)
append_observation(_rec("b", "HYP-y", ObservationType.PARTIAL, reviewed=False), p)
q = review_queue(load_observations(p))
assert [r.observation_id for r in q] == ["b"]
def test_load_directory_of_monthly_files(tmp_path):
d = tmp_path / "observations"
d.mkdir()
append_observation(_rec("a", "HYP-z", ObservationType.CONFIRMED, reviewed=True), str(d / "2026-06.jsonl"))
append_observation(_rec("b", "HYP-z", ObservationType.REFUTED, reviewed=True), str(d / "2026-07.jsonl"))
recs = load_observations(str(d))
assert {r.observation_id for r in recs} == {"a", "b"}
@@ -73,6 +73,17 @@ def test_partial_signal_surfaces_as_indication_and_is_still_asked():
assert "secure_development_lifecycle" in asked or "secure_development_lifecycle" in d["capability_delta"]
def test_questions_carry_curated_text_and_human_labels():
# the curated why_asked from the transition pattern must reach the question (not the generic
# fallback "Keine Anhaltspunkte ... klären"), and surfaced capabilities get human labels.
body = dict(_BODY, certifications=["ISO27001"], target="TISAX", scanner_findings=[])
r = _client.post("/onboarding/advisor-start", json=body)
assert r.status_code == 200, r.text
d = r.json()
assert any("Keine Anhaltspunkte" not in q["why"] for q in d["top_5_questions"]) # real expert text surfaced
assert d["capability_labels"].get("vda_isa_self_assessment") == "VDA-ISA-Selbstauskunft"
def test_unknown_target_is_404():
body = dict(_BODY, target="NOPE")
r = _client.post("/onboarding/advisor-start", json=body)