Compare commits

..

9 Commits

Author SHA1 Message Date
Benjamin Admin 31562a31e9 fix(ucca): guidance-intent erkennt direkt benannte Guidance-Dokumente
CI / detect-changes (pull_request) Successful in 13s
CI / branch-name (pull_request) Successful in 1s
CI / guardrail-integrity (pull_request) Successful in 6s
CI / secret-scan (pull_request) Successful in 9s
CI / dep-audit (pull_request) Failing after 57s
CI / sbom-scan (pull_request) Failing after 59s
CI / build-sha-integrity (pull_request) Successful in 9s
CI / validate-canonical-controls (pull_request) Successful in 8s
CI / loc-budget (pull_request) Successful in 23s
CI / go-lint (pull_request) Successful in 50s
CI / python-lint (pull_request) Failing after 14s
CI / nodejs-lint (pull_request) Failing after 1m9s
CI / nodejs-build (pull_request) Successful in 3m3s
CI / test-go (pull_request) Successful in 1m1s
CI / iace-gt-coverage (pull_request) Successful in 18s
CI / test-python-backend (pull_request) Successful in 27s
CI / test-python-document-crawler (pull_request) Successful in 15s
CI / test-python-dsms-gateway (pull_request) Successful in 12s
queryWantsGuidance verfehlte rein dokument-namige Fragen ("Welche Kriterien
nennt WP248 ...", "Was sagt GL 07/2020 ..."): guidanceIntentSignals enthielt
zwar Herausgeber (edpb/dsk/enisa) und Verben (empfiehlt/laut), aber keine
Working-Paper-/Guideline-Identifier. Dadurch loeste der Authority-Lift nicht
aus -> binding_law (bzw. im homogenen Korpus sogar off-domain MaschVO/CRA)
verdraengte die Guidance aus den Top-K.

Fix: WP2xx / GL 0x / "working paper" als Guidance-Signal ergaenzt. Generisch
ueber alle WP-/GL-Dokumente, KEINE doc-spezifische Regel (Query->Intent, nicht
Query->konkretes Dokument).

Validierung (homogener Build-Korpus, bge-m3 + Qdrant Cosine):
- 10 Hard Cases: 8/10 -> 10/10 (WP248/WP260 zurueck in Top-8)
- ComplianceBench-100: 0/100 Norm-Fragen veraendert (Freeze-Regression gruen),
  18/18 Guidance-Intent-Fragen verbessert (binding -> korrekte Guidance-Klasse)
- Hybrid == Dense (Keyword-RRF war NICHT die Ursache, der Lift-Gate war es)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-29 14:44:53 +02:00
pilotadmin f0da86ca19 Merge pull request 'feat(onboarding): advisor responsiveness — moving headline + auto-recompute' (#54) from feat/advisor-ux-responsiveness into main 2026-06-28 19:31:20 +02:00
Benjamin Admin 867f8c3854 feat(onboarding): make the advisor visibly responsive — headline leads with the moving number + auto-recompute
Testing surfaced that toggling certifications appeared to "do nothing": the headline led with the TOTAL
requirement count (constant per target, e.g. 17 for CRA), and the page only recomputed on an explicit
button click. Both fixed:
  - engine.py headline now leads with the number that actually moves: "11 von 17 Anforderungen offen ·
    6 wahrscheinlich (Zertifikate) · 5 zu klären" (was "17 Anforderungen erkannt · …"). Keeps the
    "automatisch erkannt (Intake)" substring.
  - frontend auto-recomputes on certifications / target / scanner-signal change (no button needed).

Now ISO27001 alone -> "13 von 17 offen · 4 wahrscheinlich"; + ISO9001+TISAX+IEC62443 -> "11 von 17 offen ·
6 wahrscheinlich". (Domain truth stays visible: CRA's product-cyber gaps barely move with management-system
certs.) 28 onboarding+transition tests pass, check-loc 0.
2026-06-28 19:31:15 +02:00
pilotadmin 26a8518107 Merge pull request 'feat(onboarding): surface curated expert text + human labels' (#53) from feat/advisor-human-text into main 2026-06-28 18:47:07 +02:00
Benjamin Admin 807a7002b2 feat(onboarding): surface curated expert text + human capability labels (advisor was showing snake_case)
The advisor was structurally correct but unusable: every question showed a snake_case capability id plus a
single generic fallback reason ("Keine Anhaltspunkte im Unternehmensprofil — klären"). The expert text
already EXISTED in the transition patterns (why_asked / reviewable_claim) — the pipeline just dropped it.

  - transition_reasoning: TargetRequirement gains `rationale`; assess_transition uses it as the request
    reason when present, else the generic fallback (additive, backward-compatible for all consumers).
  - onboarding_service._target carries the pattern's why_asked (delta) and reviewable_claim (likely_covered)
    into the requirement rationale -> the question's `why`.
  - knowledge/onboarding/capability_labels.yaml: curated DE labels (id -> human), reusable across targets;
    labels_for() + response.capability_labels expose them; the frontend renders label || prettified id.

Now ISO27001->TISAX reads "Auftragsverarbeitung (Art. 28 DSGVO) — If a TISAX data label is in scope, you
must show Art. 28 GDPR processing-on-behalf controls; ISO 27001 does not establish these." instead of
"data_protection_processing_on_behalf — klären". why_asked text is still EN (existing knowledge; translation
is curation). 34 onboarding+transition tests pass, mypy --strict clean (13 modules), check-loc 0.
2026-06-28 18:46:56 +02:00
pilotadmin 5beb5a319a Merge pull request 'feat(admin): ETO / Onboarding-Advisor test page' (#52) from feat/onboarding-advisor-frontend into main 2026-06-28 17:12:44 +02:00
Benjamin Admin 239702fdca feat(admin): ETO / Onboarding-Advisor test page (thin operator surface over the advisor endpoint)
A focused client page at /sdk/onboarding-advisor that exercises POST /api/compliance/onboarding/
advisor-start through the existing compliance proxy: pick certifications + target + scanner findings
(observation / partial / requirement) and render the result — headline, silent-intake summary,
auto-detected (green), indications (amber), next-best questions with WHY, inferred (Welt-1) vs rejected
assumptions, capability delta, evidence requests, completeness. NOT the regulation gap engine
(/sdk/gap-analysis is a different flow). No new backend; calls only the existing endpoint. 195 lines.
2026-06-28 17:12:40 +02:00
pilotadmin d1a5fc7205 Merge pull request 'feat(onboarding): Observation Log — append-only JSONL calibration store (59b/c)' (#51) from feat/observation-log into main 2026-06-28 16:29:58 +02:00
Benjamin Admin 7df15010ff feat(onboarding): Observation Log — append-only JSONL calibration store (Task 59b/c v1)
Per the user's decision (2026-06-28): observations are CALIBRATION data for the knowledge base, NOT
business data and NOT product-DB data. So they live with the other versioned knowledge artifacts as an
append-only JSONL log under knowledge/observations/ — NO migration, NO DB. (A real persistence layer is
only warranted once thousands of onboardings exist; not before.)

  - ObservationRecord = Observation + log metadata (observation_id, timestamp [caller-stamped, no hidden
    clock], customer_archetype [anonymised — NEVER a real name], evidence, provenance, knowledge_version).
  - append_observation() writes one JSON line; append-only, lines are never rewritten. A later review is a
    NEW line with the same observation_id; load_observations(reconcile=True) keeps the latest per id.
  - load_observations() reads a single .jsonl or a directory of monthly .jsonl files.
  - aggregate_by_hypothesis() (59c) -> per-hypothesis distribution + confidence, COMPUTED from the log
    (computed-not-stored); the review gate (reviewed-only) is enforced in empirical_distribution/confidence.
  - review_queue() -> the unreviewed worklist. Observation -> Review -> Accepted -> recompute, never
    Observation -> confidence++. Nothing is ever written back to a hypothesis.

You can `rm` the log and recompute, `git diff` it over months, or rebuild confidence under a new policy —
fully consistent with computed-not-stored and the product/knowledge data separation.

Non-runtime (module + tests only, no endpoint) -> origin/main, NO dev deploy. 5 new tests (append-only,
review supersession, review-gate statistics, queue, monthly-file load); 27 onboarding tests pass, mypy
--strict clean (9 modules), check-loc 0. 59d (surface computed confidence at runtime) stays a later step.
2026-06-28 16:29:54 +02:00
13 changed files with 482 additions and 17 deletions
@@ -0,0 +1,200 @@
'use client'
// ETO / Onboarding-Advisor — thin operator surface over POST /api/compliance/onboarding/advisor-start.
// Certifications + target + scanner findings -> Silent Pass -> Advisor. NOT the regulation gap engine
// (/sdk/gap-analysis is a different flow: product -> applicable regulations). This tests the cert->delta
// case: "TISAX/ISO27001 -> CRA, what is auto-detected, what stays an open question?". No new backend.
import React, { useEffect, useState } from 'react'
const CERTS = ['ISO27001', 'TISAX', 'ISO9001', 'IEC62443', 'ISO13485', 'ISO14001', 'ASPICE', 'IATF16949']
// label -> {signal_id, source_type} — demonstrates all three signal KINDS (observation / partial / requirement)
const FINDINGS: Array<{ label: string; signal_id: string; source_type: string; kind: string }> = [
{ label: 'SBOM im Repo (CycloneDX/SPDX)', signal_id: 'cyclonedx_found', source_type: 'repository', kind: 'observation' },
{ label: 'security.txt / CVD-Policy veröffentlicht', signal_id: 'security_txt', source_type: 'website', kind: 'observation' },
{ label: 'Signierte Releases', signal_id: 'signed_releases', source_type: 'repository', kind: 'observation' },
{ label: 'Produkt-Risikobewertung (Dokument)', signal_id: 'risk_assessment_pdf', source_type: 'document', kind: 'observation' },
{ label: 'CI-Pipeline vorhanden (nur Indikation)', signal_id: 'github_actions_ci', source_type: 'repository', kind: 'partial' },
{ label: 'Cloud-/vernetztes Produkt', signal_id: 'cloud_hosted', source_type: 'product', kind: 'observation' },
{ label: 'Ausschreibung FORDERT SBOM (Requirement)', signal_id: 'requires_sbom', source_type: 'tender', kind: 'requirement' },
{ label: 'OEM FORDERT PSIRT (Requirement)', signal_id: 'supplier_requires_psirt', source_type: 'oem', kind: 'requirement' },
]
interface Question { capability_id: string; question_intent: string; why: string; information_value: number; priority: string }
interface Inferred { certification: string; capabilities: string[]; statement: string }
interface Rejected { certification?: string; statement: string; reason: string }
interface Measure { capability_id: string; leverage: number; closes: string[] }
interface AdvisorResponse {
silent_intake_summary: string; headline: string; auto_detected: string[]; indications: string[]
inferred_assumptions: Inferred[]; rejected_assumptions: Rejected[]; top_5_questions: Question[]
capability_delta: string[]; top_measures: Measure[]; evidence_requests: string[]
unsupported_domains: string[]; completeness_summary: string; capability_labels: Record<string, string>
}
const PROXY = '/api/sdk/v1/compliance/onboarding'
function Chips({ items, tone }: { items: string[]; tone: string }) {
if (!items.length) return <span className="text-gray-400 text-sm"></span>
return (
<div className="flex flex-wrap gap-2">
{items.map(c => <span key={c} className={`px-2.5 py-1 rounded-full text-xs font-medium ${tone}`}>{c}</span>)}
</div>
)
}
function Section({ title, hint, children }: { title: string; hint?: string; children: React.ReactNode }) {
return (
<div className="bg-white rounded-xl border border-gray-200 p-5">
<h3 className="font-semibold text-gray-900">{title}</h3>
{hint && <p className="text-xs text-gray-500 mt-0.5 mb-2">{hint}</p>}
<div className="mt-2">{children}</div>
</div>
)
}
export default function OnboardingAdvisorPage() {
const [targets, setTargets] = useState<string[]>([])
const [company, setCompany] = useState('Beispiel Maschinenbau')
const [industry, setIndustry] = useState('machine_builder')
const [certs, setCerts] = useState<string[]>(['ISO27001', 'ISO9001'])
const [target, setTarget] = useState('CRA')
const [findings, setFindings] = useState<string[]>(['cyclonedx_found', 'github_actions_ci', 'requires_sbom'])
const [knownEvidence, setKnownEvidence] = useState('CE-Prozess')
const [result, setResult] = useState<AdvisorResponse | null>(null)
const [loading, setLoading] = useState(false)
const [error, setError] = useState('')
useEffect(() => {
fetch(`${PROXY}/targets`).then(r => r.json()).then(d => {
if (Array.isArray(d.targets)) { setTargets(d.targets); if (!d.targets.includes('CRA') && d.targets[0]) setTarget(d.targets[0]) }
}).catch(() => {})
}, [])
const toggle = (list: string[], set: (v: string[]) => void, v: string) =>
set(list.includes(v) ? list.filter(x => x !== v) : [...list, v])
const lbl = (id: string) => result?.capability_labels?.[id] || id.replace(/_/g, ' ')
const run = async () => {
setLoading(true); setError(''); setResult(null)
try {
const scanner_findings = FINDINGS.filter(f => findings.includes(f.signal_id))
.map(f => ({ signal_id: f.signal_id, source_type: f.source_type }))
const res = await fetch(`${PROXY}/advisor-start`, {
method: 'POST', headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
company, industry, products: [], markets: ['EU'], certifications: certs,
known_evidence: knownEvidence ? knownEvidence.split(',').map(s => s.trim()).filter(Boolean) : [],
target, scanner_findings,
}),
})
if (!res.ok) throw new Error(await res.text())
setResult(await res.json())
} catch (e) {
setError(e instanceof Error ? e.message : 'Advisor fehlgeschlagen')
} finally { setLoading(false) }
}
// auto-recompute when certifications / target / scanner signals change (no button click needed)
useEffect(() => { if (certs.length) run() }, [certs, target, findings]) // eslint-disable-line react-hooks/exhaustive-deps
return (
<div className="min-h-screen bg-gray-50 py-8">
<div className="max-w-5xl mx-auto px-4">
<h1 className="text-3xl font-bold text-gray-900">ETO / Onboarding-Advisor</h1>
<p className="text-gray-600 mt-2 mb-6">
Zertifikate + Ziel + Scanner-Signale Silent Pass Capability-Delta + nächste beste Fragen.
Welt-1: ein Zertifikat <em>legt nahe</em>, beweist nichts (Verifikation erforderlich).
</p>
<div className="grid md:grid-cols-2 gap-4 mb-6">
<Section title="Unternehmen & Ziel">
<label className="block text-sm text-gray-600">Unternehmen
<input value={company} onChange={e => setCompany(e.target.value)} className="mt-1 w-full border rounded-lg px-3 py-2" /></label>
<label className="block text-sm text-gray-600 mt-3">Branche
<input value={industry} onChange={e => setIndustry(e.target.value)} className="mt-1 w-full border rounded-lg px-3 py-2" /></label>
<label className="block text-sm text-gray-600 mt-3">Ziel
<select value={target} onChange={e => setTarget(e.target.value)} className="mt-1 w-full border rounded-lg px-3 py-2">
{(targets.length ? targets : ['CRA']).map(t => <option key={t} value={t}>{t}</option>)}
</select></label>
<label className="block text-sm text-gray-600 mt-3">Vorhandene Nachweise (kommagetrennt)
<input value={knownEvidence} onChange={e => setKnownEvidence(e.target.value)} className="mt-1 w-full border rounded-lg px-3 py-2" /></label>
</Section>
<Section title="Zertifizierungen">
<div className="flex flex-wrap gap-2">
{CERTS.map(c => (
<button key={c} onClick={() => toggle(certs, setCerts, c)}
className={`px-3 py-1.5 rounded-lg text-sm border ${certs.includes(c) ? 'bg-blue-600 text-white border-blue-600' : 'bg-white text-gray-700 border-gray-300'}`}>{c}</button>
))}
</div>
</Section>
</div>
<Section title="Scanner-Signale (Silent Pass)" hint="observation = gesehen · partial = Indikation · requirement = gefordert (≠ vorhanden)">
<div className="grid sm:grid-cols-2 gap-2">
{FINDINGS.map(f => (
<label key={f.signal_id} className="flex items-center gap-2 text-sm text-gray-700">
<input type="checkbox" checked={findings.includes(f.signal_id)} onChange={() => toggle(findings, setFindings, f.signal_id)} />
<span>{f.label}</span>
<span className={`ml-auto text-[10px] px-1.5 py-0.5 rounded ${f.kind === 'requirement' ? 'bg-purple-100 text-purple-700' : f.kind === 'partial' ? 'bg-amber-100 text-amber-700' : 'bg-emerald-100 text-emerald-700'}`}>{f.kind}</span>
</label>
))}
</div>
</Section>
<button onClick={run} disabled={loading || !certs.length}
className="mt-6 w-full py-3 bg-blue-600 text-white rounded-xl font-medium hover:bg-blue-700 disabled:opacity-50">
{loading ? 'Analysiere…' : 'Advisor starten'}
</button>
{error && <div className="mt-6 bg-red-50 border border-red-200 rounded-lg p-4 text-red-700 text-sm whitespace-pre-wrap">{error}</div>}
{result && (
<div className="mt-8 space-y-4">
<div className="bg-blue-600 text-white rounded-xl p-5">
<div className="text-lg font-semibold">{result.headline}</div>
<div className="text-blue-100 text-sm mt-1">{result.silent_intake_summary}</div>
</div>
<div className="grid md:grid-cols-2 gap-4">
<Section title="Automatisch erkannt" hint="konkrete Artefakte nicht mehr gefragt"><Chips items={result.auto_detected.map(lbl)} tone="bg-emerald-100 text-emerald-800" /></Section>
<Section title="Indikationen" hint="erhöht Annahmestärke trotzdem gefragt"><Chips items={result.indications.map(lbl)} tone="bg-amber-100 text-amber-800" /></Section>
</div>
<Section title="Nächste beste Fragen" hint="max 5, jede erklärt sich selbst">
{result.top_5_questions.length ? (
<ol className="space-y-3">
{result.top_5_questions.map((q, i) => (
<li key={q.capability_id} className="border-l-2 border-blue-300 pl-3">
<div className="font-medium text-gray-900">{i + 1}. {lbl(q.capability_id)}</div>
<div className="text-sm text-gray-600">{q.why}</div>
</li>
))}
</ol>
) : <span className="text-gray-400 text-sm"></span>}
</Section>
<div className="grid md:grid-cols-2 gap-4">
<Section title="Wahrscheinlich abgedeckt (Welt-1)" hint="Zertifikat legt nahe Verifikation erforderlich">
{result.inferred_assumptions.length ? result.inferred_assumptions.map(a => (
<div key={a.certification} className="mb-2"><span className="font-medium">{a.certification}</span>: {a.capabilities.map(lbl).join(', ')}</div>
)) : <span className="text-gray-400 text-sm"></span>}
</Section>
<Section title="Nicht relevant" hint="relevance(evidence, target) = 0">
{result.rejected_assumptions.length ? result.rejected_assumptions.map((a, i) => (
<div key={i} className="mb-1 text-sm text-gray-700">{a.statement}</div>
)) : <span className="text-gray-400 text-sm"></span>}
</Section>
</div>
<div className="grid md:grid-cols-2 gap-4">
<Section title="Offene Lücken (Delta)"><Chips items={result.capability_delta.map(lbl)} tone="bg-gray-100 text-gray-700" /></Section>
<Section title="Geforderte Nachweise"><Chips items={result.evidence_requests} tone="bg-gray-100 text-gray-700" /></Section>
</div>
<Section title="Vollständigkeit" hint={result.unsupported_domains.length ? `nicht abgedeckt: ${result.unsupported_domains.join(', ')}` : undefined}>
<span className="text-sm text-gray-700">{result.completeness_summary || '—'}</span>
</Section>
</div>
)}
</div>
</div>
)
}
@@ -18,10 +18,7 @@ func TestGuidanceFixE2E(t *testing.T) {
t.Skip("set RUN_E2E=1 + QDRANT_URL/OLLAMA_URL to run") t.Skip("set RUN_E2E=1 + QDRANT_URL/OLLAMA_URL to run")
} }
c := NewLegalRAGClient() c := NewLegalRAGClient()
coll := os.Getenv("E2E_COLLECTION") coll := "bp_compliance_kb_2026_1_build"
if coll == "" {
coll = "bp_compliance_kb_2026_1_build"
}
cases := []struct{ id, q, expect string }{ cases := []struct{ id, q, expect string }{
{"GQ-0012", "Welche neun Kriterien nennt WP248 fuer ein voraussichtlich hohes Risiko?", "WP248"}, {"GQ-0012", "Welche neun Kriterien nennt WP248 fuer ein voraussichtlich hohes Risiko?", "WP248"},
{"GQ-0013", "Ab wie vielen der WP248-Kriterien ist in der Regel eine Datenschutz-Folgenabschaetzung erforderlich?", "WP248"}, {"GQ-0013", "Ab wie vielen der WP248-Kriterien ist in der Regel eine Datenschutz-Folgenabschaetzung erforderlich?", "WP248"},
@@ -86,10 +83,7 @@ func TestBenchE2E(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
c := NewLegalRAGClient() c := NewLegalRAGClient()
coll := os.Getenv("E2E_COLLECTION") coll := "bp_compliance_kb_2026_1_build"
if coll == "" {
coll = "bp_compliance_kb_2026_1_build"
}
fmt.Printf("### BENCH n=%d hybrid=%v\n", len(bench.Questions), os.Getenv("RAG_HYBRID_SEARCH") != "false") fmt.Printf("### BENCH n=%d hybrid=%v\n", len(bench.Questions), os.Getenv("RAG_HYBRID_SEARCH") != "false")
for _, q := range bench.Questions { for _, q := range bench.Questions {
res, err := c.SearchCollection(context.Background(), coll, q.Question, nil, 8) res, err := c.SearchCollection(context.Background(), coll, q.Question, nil, 8)
@@ -8,7 +8,7 @@ This adds NO new reasoning logic. It exposes the already-built, tested orchestra
""" """
import logging import logging
from typing import List, Optional from typing import Dict, List, Optional
from fastapi import APIRouter, HTTPException from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
@@ -20,7 +20,7 @@ from compliance.onboarding import (
ProducedSignal, ProducedSignal,
RejectedAssumption, RejectedAssumption,
) )
from compliance.services.onboarding_service import run_advisor, supported_targets from compliance.services.onboarding_service import labels_for, run_advisor, supported_targets
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
router = APIRouter(prefix="/onboarding", tags=["onboarding"]) router = APIRouter(prefix="/onboarding", tags=["onboarding"])
@@ -50,6 +50,7 @@ class AdvisorResponse(BaseModel):
evidence_requests: List[str] = Field(default_factory=list) evidence_requests: List[str] = Field(default_factory=list)
unsupported_domains: List[str] = Field(default_factory=list) unsupported_domains: List[str] = Field(default_factory=list)
completeness_summary: str = "" completeness_summary: str = ""
capability_labels: Dict[str, str] = Field(default_factory=dict) # capability_id -> human label (DE)
@router.get("/targets") @router.get("/targets")
@@ -65,10 +66,17 @@ def advisor_start_endpoint(req: OnboardingAdvisorRequest) -> AdvisorResponse:
company=req.company, certifications=req.certifications, target=req.target, company=req.company, certifications=req.certifications, target=req.target,
signals=req.scanner_findings, known_evidence=req.known_evidence, signals=req.scanner_findings, known_evidence=req.known_evidence,
products=req.products, markets=req.markets, industry=req.industry or "") products=req.products, markets=req.markets, industry=req.industry or "")
surfaced = [
*result.auto_detected, *result.indications, *result.capability_delta,
*(q.capability_id for q in result.next_best_questions),
*(c for a in result.inferred_assumptions for c in a.capabilities),
*(m.capability_id for m in result.top_measures),
]
return AdvisorResponse( return AdvisorResponse(
silent_intake_summary=si_summary, headline=result.headline, auto_detected=result.auto_detected, silent_intake_summary=si_summary, headline=result.headline, auto_detected=result.auto_detected,
indications=result.indications, indications=result.indications,
inferred_assumptions=result.inferred_assumptions, rejected_assumptions=result.rejected_assumptions, inferred_assumptions=result.inferred_assumptions, rejected_assumptions=result.rejected_assumptions,
top_5_questions=result.next_best_questions, capability_delta=result.capability_delta, top_5_questions=result.next_best_questions, capability_delta=result.capability_delta,
top_measures=result.top_measures, evidence_requests=result.evidence_requests, top_measures=result.top_measures, evidence_requests=result.evidence_requests,
unsupported_domains=result.unsupported_domains, completeness_summary=result.completeness_summary) unsupported_domains=result.unsupported_domains, completeness_summary=result.completeness_summary,
capability_labels=labels_for(surfaced))
@@ -21,6 +21,14 @@ from .observations import (
empirical_distribution, empirical_distribution,
reviewed, reviewed,
) )
from .observation_log import (
HypothesisStats,
ObservationRecord,
aggregate_by_hypothesis,
append_observation,
load_observations,
review_queue,
)
from .signals import ( from .signals import (
ProducedSignal, ProducedSignal,
SignalVocabularyEntry, SignalVocabularyEntry,
@@ -69,4 +77,10 @@ __all__ = [
"ProducedSignal", "ProducedSignal",
"SignalVocabularyEntry", "SignalVocabularyEntry",
"normalize_signals", "normalize_signals",
"ObservationRecord",
"HypothesisStats",
"append_observation",
"load_observations",
"aggregate_by_hypothesis",
"review_queue",
] ]
@@ -143,8 +143,8 @@ def advisor_start(
next_best_questions=next_q, capability_delta=delta, top_measures=measures, next_best_questions=next_q, capability_delta=delta, top_measures=measures,
evidence_requests=evidence, unsupported_domains=unsupported, evidence_requests=evidence, unsupported_domains=unsupported,
completeness_summary=rep.completeness_summary, completeness_summary=rep.completeness_summary,
headline="%d Anforderungen erkannt · %d automatisch erkannt (Intake) · %d wahrscheinlich (Zertifikate) · %d zu klären" headline="%d von %d Anforderungen offen · %d automatisch erkannt (Intake) · %d wahrscheinlich (Zertifikate) · %d zu klären"
% (len(assess.coverage), len(auto_detected), len(probably), len(next_q))) % (len(delta), len(assess.coverage), len(auto_detected), len(probably), len(next_q)))
def apply_answer(known_capabilities: Sequence[str], capability_id: str, answer: str) -> List[str]: def apply_answer(known_capabilities: Sequence[str], capability_id: str, answer: str) -> List[str]:
@@ -0,0 +1,108 @@
"""Observation Log — append-only JSONL store for empirical calibration events (Task 59b v1).
Observations are NOT business data and NOT product-DB data — they are CALIBRATION events for the
knowledge base ("ISO27001 -> SDL confirmed", "TISAX -> supplier security refuted"). So they live with the
other versioned knowledge artifacts (hypotheses, transition patterns, vocabulary), NOT in the product
database: an append-only JSONL log under `knowledge/observations/`. NO migration, NO DB. The empirical
DISTRIBUTION and CONFIDENCE are COMPUTED from this log on demand (computed-not-stored) — a hypothesis is
NEVER auto-updated; only REVIEWED observations calibrate (the review gate, enforced in observations.py).
Append-only: each line is one ObservationRecord and lines are NEVER modified in place. A later review is
a NEW line with the same observation_id and reviewed=true; load_observations() reconciles to the latest
per id. You can `rm` the log and recompute, `git diff` it over months, or rebuild confidence under a new
policy. Anonymisation is MANDATORY: customer_archetype is a sector/cert archetype, NEVER a real company
name (this file is committed to git). Time is stamped by the CALLER (no hidden clock) for determinism.
I/O only at the append/load boundary; statistics are pure. Python 3.9 compatible.
"""
from __future__ import annotations
import json
import os
from typing import Dict, List, Optional, Sequence
from pydantic import BaseModel, Field
from .observations import Observation, empirical_confidence, empirical_distribution
_DEFAULT_LOG = os.path.join(
os.path.dirname(__file__), "..", "..", "knowledge", "observations", "observations.jsonl")
class ObservationRecord(Observation):
"""A persisted observation line: an Observation (with its review gate + observation_type) plus log
metadata. `observation_id` is stable — a review re-appends the SAME id with reviewed=true."""
observation_id: str # stable id; a review re-appends the same id
timestamp: str = "" # ISO 8601, stamped by the CALLER (no hidden clock)
customer_archetype: str = "" # sector/cert archetype — NEVER a real company name
evidence: str = "" # what backs the answer (reference, not the artifact)
provenance: str = "" # where the answer came from (audit trail)
knowledge_version: str = "" # hypotheses/vocabulary version observed under
class HypothesisStats(BaseModel):
"""Per-hypothesis empirical rollup — all COMPUTED from the log, nothing stored on the hypothesis."""
hypothesis_id: str
distribution: Dict[str, int] = Field(default_factory=dict) # reviewed counts per observation_type
confidence: Optional[float] = None # None until a for/against obs is reviewed
reviewed_count: int = 0
total_count: int = 0
def append_observation(record: ObservationRecord, path: str = _DEFAULT_LOG) -> None:
"""Append ONE record as a JSON line. Append-only — existing lines are never rewritten."""
os.makedirs(os.path.dirname(path), exist_ok=True)
line = json.dumps(record.model_dump(mode="json"), ensure_ascii=False, sort_keys=True)
with open(path, "a", encoding="utf-8") as fh:
fh.write(line + "\n")
def load_observations(path: str = _DEFAULT_LOG, reconcile: bool = True) -> List[ObservationRecord]:
"""Read all records — a single `.jsonl` file or a directory of monthly `.jsonl` files. With
reconcile, the LATEST record per observation_id wins (a later reviewed=true supersedes the original).
Returns deterministic order (by observation_id when reconciled, else append order)."""
files: List[str] = []
if os.path.isdir(path):
files = sorted(os.path.join(path, f) for f in os.listdir(path) if f.endswith(".jsonl"))
elif os.path.exists(path):
files = [path]
records: List[ObservationRecord] = []
for fpath in files:
with open(fpath, encoding="utf-8") as fh:
for raw in fh:
raw = raw.strip()
if raw:
records.append(ObservationRecord(**json.loads(raw)))
if not reconcile:
return records
latest: Dict[str, ObservationRecord] = {}
for r in records: # file/append order -> later lines win
latest[r.observation_id] = r
return [latest[k] for k in sorted(latest)]
def aggregate_by_hypothesis(records: Sequence[ObservationRecord]) -> List[HypothesisStats]:
"""Per-hypothesis distribution + confidence. The review gate applies inside empirical_distribution/
empirical_confidence (reviewed-only), so unreviewed observations are counted in total but never
calibrate. Deterministic order (by hypothesis id)."""
by_hyp: Dict[str, List[ObservationRecord]] = {}
for r in records:
by_hyp.setdefault(r.hypothesis_id, []).append(r)
out: List[HypothesisStats] = []
for hyp in sorted(by_hyp):
obs = by_hyp[hyp]
out.append(HypothesisStats(
hypothesis_id=hyp,
distribution=empirical_distribution(obs), # reviewed-only (the gate)
confidence=empirical_confidence(obs), # None until reviewed for/against
reviewed_count=sum(1 for o in obs if o.reviewed),
total_count=len(obs)))
return out
def review_queue(records: Sequence[ObservationRecord]) -> List[ObservationRecord]:
"""The reviewer's worklist: observations not yet reviewed. Calibration ignores these until a reviewer
accepts them (Observation -> Review -> Accepted -> Knowledge recomputed), never Observation -> conf++."""
return [r for r in records if not r.reviewed]
@@ -9,7 +9,7 @@ It adds NO new reasoning logic — it only exposes what exists. No DB, no persis
from __future__ import annotations from __future__ import annotations
import os import os
from typing import Any, Dict, List, Sequence, Tuple from typing import Any, Dict, Iterable, List, Sequence, Tuple
import yaml import yaml
@@ -37,6 +37,13 @@ def _load(*parts: str) -> Any:
_HYP_LIB = [CapabilityHypothesis(**h) for h in _load("certification_hypotheses", "hypotheses.yaml")["hypotheses"]] _HYP_LIB = [CapabilityHypothesis(**h) for h in _load("certification_hypotheses", "hypotheses.yaml")["hypotheses"]]
_VOCAB = [SignalVocabularyEntry(**v) for v in _load("onboarding", "signal_vocabulary.yaml")["signals"]] _VOCAB = [SignalVocabularyEntry(**v) for v in _load("onboarding", "signal_vocabulary.yaml")["signals"]]
_SIGNAL_MAP = [SignalMapping(**m) for m in _load("onboarding", "intake_signal_map.yaml")["mappings"]] _SIGNAL_MAP = [SignalMapping(**m) for m in _load("onboarding", "intake_signal_map.yaml")["mappings"]]
_LABELS: Dict[str, str] = _load("onboarding", "capability_labels.yaml")["labels"]
def labels_for(capability_ids: Iterable[str]) -> Dict[str, str]:
"""Human labels (DE) for the given capability ids — presentation only. Ids without a curated label
are omitted (the frontend falls back to a prettified id). Deduped, deterministic."""
return {c: _LABELS[c] for c in dict.fromkeys(capability_ids) if c in _LABELS}
# target id -> transition pattern that defines its required capabilities (curated registry) # target id -> transition pattern that defines its required capabilities (curated registry)
_TARGET_PATTERNS = { _TARGET_PATTERNS = {
@@ -53,9 +60,10 @@ def supported_targets() -> List[str]:
def _target(target_id: str) -> Tuple[List[TargetRequirement], Dict[str, List[str]]]: def _target(target_id: str) -> Tuple[List[TargetRequirement], Dict[str, List[str]]]:
pat = _load("transition_patterns", _TARGET_PATTERNS[target_id]) pat = _load("transition_patterns", _TARGET_PATTERNS[target_id])
reqs = [TargetRequirement(capability_id=a["capability"]) for a in pat["likely_covered"]] reqs = [TargetRequirement(capability_id=a["capability"], rationale=a.get("reviewable_claim", "")) for a in pat["likely_covered"]]
reqs += [TargetRequirement(capability_id=d["capability"], question_intent=d.get("needed_information", "verify_existence"), reqs += [TargetRequirement(capability_id=d["capability"], question_intent=d.get("needed_information", "verify_existence"),
expected_evidence=d.get("expected_evidence", [])) for d in pat["delta_requirements"]] rationale=d.get("why_asked", ""), expected_evidence=d.get("expected_evidence", []))
for d in pat["delta_requirements"]]
covers = {d["capability"]: d.get("covers_targets", []) for d in pat["delta_requirements"]} covers = {d["capability"]: d.get("covers_targets", []) for d in pat["delta_requirements"]}
return reqs, covers return reqs, covers
@@ -104,7 +104,8 @@ def assess_transition(
) )
buckets[status].append(req.capability_id) buckets[status].append(req.capability_id)
if status in _REQUESTABLE: if status in _REQUESTABLE:
reason, prio = _REQUESTABLE[status] default_reason, prio = _REQUESTABLE[status]
reason = req.rationale or default_reason # curated human text wins over the generic fallback
requests.append( requests.append(
TransitionQuestionRequest( TransitionQuestionRequest(
capability_id=req.capability_id, capability_id=req.capability_id,
@@ -70,6 +70,7 @@ class TargetRequirement(BaseModel):
capability_id: str # MCAP-... capability_id: str # MCAP-...
question_intent: str = "verify_existence" # passed through to the request, not rendered question_intent: str = "verify_existence" # passed through to the request, not rendered
rationale: str = "" # curated human text (e.g. why_asked / reviewable_claim) — surfaced as the request reason
expected_evidence: List[str] = Field(default_factory=list) expected_evidence: List[str] = Field(default_factory=list)
source_control_id: Optional[str] = None source_control_id: Optional[str] = None
supports_obligations: List[str] = Field(default_factory=list) supports_obligations: List[str] = Field(default_factory=list)
@@ -0,0 +1,2 @@
# Append-only observation log (Task 59b). Real lines (observations.jsonl / YYYY-MM.jsonl) are written at
# runtime via compliance/onboarding/observation_log.py. Anonymised archetypes only — NEVER real company names.
@@ -0,0 +1,45 @@
# Human-readable capability labels (DE) — presentation only, reusable across all targets.
# A capability id is the stable machine identity; this maps it to an expert-facing label for the UI.
# Curated knowledge (draft — to be corrected by the domain expert). Missing ids fall back to a
# prettified id in the frontend. NO real company names. Keep labels short + concrete.
labels:
# ── ISMS / ISO 27001 core ───────────────────────────────────────────────
information_security_management: "Informationssicherheits-Managementsystem (ISMS)"
access_control_and_authentication: "Zugriffskontrolle & Authentifizierung"
asset_and_configuration_management: "Asset- & Konfigurationsverwaltung"
cryptography: "Kryptographie / Verschlüsselung"
incident_management: "Security-Incident-Management"
security_awareness_training: "Security-Awareness-Schulungen"
supplier_security: "Lieferanten-Sicherheit"
security_logging_and_monitoring: "Security-Logging & Monitoring"
technical_vulnerability_management: "Technisches Schwachstellen-Management"
# ── TISAX / VDA-spezifisch ──────────────────────────────────────────────
prototype_protection: "Prototypenschutz (physisch & logisch)"
tisax_label_scope_selection: "TISAX-Label-/Scope-Festlegung"
tisax_assessment_via_enx: "TISAX-Assessment über die ENX-Plattform"
vda_isa_self_assessment: "VDA-ISA-Selbstauskunft"
data_protection_processing_on_behalf: "Auftragsverarbeitung (Art. 28 DSGVO)"
physical_security: "Physische Sicherheit / Zutrittskontrolle"
# ── QM / ISO 9001 ───────────────────────────────────────────────────────
document_and_change_control: "Dokumenten- & Änderungslenkung"
supplier_evaluation: "Lieferantenbewertung"
release_and_approval_process: "Freigabe- & Genehmigungsprozess"
ce_conformity_assessment_and_technical_documentation: "CE-Konformitätsbewertung & technische Dokumentation"
# ── CRA / Produkt-Cybersecurity ─────────────────────────────────────────
sbom_creation: "SBOM-Erstellung (Software-Stückliste)"
coordinated_vulnerability_disclosure: "Coordinated Vulnerability Disclosure (CVD)"
secure_development_lifecycle: "Sicherer Entwicklungslebenszyklus (SDLC)"
secure_signed_update_distribution: "Sichere, signierte Update-Verteilung"
security_update_support_period: "Sicherheits-Update-Supportzeitraum"
product_cyber_risk_assessment: "Produkt-Cyber-Risikobewertung"
exploited_vuln_and_incident_reporting: "Meldung ausgenutzter Schwachstellen & Vorfälle"
public_security_advisories: "Öffentliche Security Advisories"
cybersecurity_management_system: "Cybersecurity-Managementsystem (CSMS)"
# ── MaschinenVO / Safety ────────────────────────────────────────────────
machine_safety_risk_assessment: "Maschinen-Risikobeurteilung"
mechanical_safety_and_guards: "Mechanische Sicherheit & Schutzeinrichtungen"
operating_instructions_and_safety_information: "Betriebsanleitung & Sicherheitshinweise"
protection_against_corruption_of_safety_functions: "Schutz der Sicherheitsfunktionen vor Manipulation"
# ── Umwelt ──────────────────────────────────────────────────────────────
environmental_management_documentation: "Umweltmanagement-Dokumentation"
@@ -0,0 +1,73 @@
"""Observation Log — append-only JSONL store + computed statistics (Task 59b/c v1).
Pins the user's decision (2026-06-28): observations are CALIBRATION data, not product data -> an
append-only JSONL log under knowledge/observations/, NO DB, NO migration. Distribution and confidence are
COMPUTED from the log; only REVIEWED observations calibrate (review gate); a later review is a new line
that supersedes by observation_id. Nothing is ever written back to a hypothesis.
"""
from __future__ import annotations
from compliance.onboarding import (
ObservationRecord,
ObservationType,
aggregate_by_hypothesis,
append_observation,
load_observations,
review_queue,
)
def _rec(oid, hyp, otype, reviewed=False, **kw):
return ObservationRecord(
observation_id=oid, hypothesis_id=hyp, observation_type=otype, reviewed=reviewed,
timestamp="2026-07-01T00:00:00Z", customer_archetype="machine_builder+ISO27001", **kw)
def test_append_only_round_trip(tmp_path):
p = str(tmp_path / "obs.jsonl")
append_observation(_rec("o1", "HYP-secure_dev", ObservationType.CONFIRMED, reviewed=True), p)
append_observation(_rec("o2", "HYP-secure_dev", ObservationType.REFUTED, reviewed=True), p)
recs = load_observations(p)
assert {r.observation_id for r in recs} == {"o1", "o2"}
assert all(r.customer_archetype == "machine_builder+ISO27001" for r in recs) # anonymised archetype, not a name
def test_review_supersedes_by_id_append_only(tmp_path):
p = str(tmp_path / "obs.jsonl")
append_observation(_rec("o1", "HYP-x", ObservationType.CONFIRMED, reviewed=False), p) # raw answer
append_observation(_rec("o1", "HYP-x", ObservationType.CONFIRMED, reviewed=True,
reviewed_by="anna"), p) # later review event
assert len(load_observations(p, reconcile=False)) == 2 # both lines kept (append-only)
recs = load_observations(p) # reconciled
assert len(recs) == 1 and recs[0].reviewed and recs[0].reviewed_by == "anna"
def test_statistics_apply_the_review_gate(tmp_path):
p = str(tmp_path / "obs.jsonl")
append_observation(_rec("a", "HYP-sdl", ObservationType.CONFIRMED, reviewed=True), p)
append_observation(_rec("b", "HYP-sdl", ObservationType.CONFIRMED, reviewed=True), p)
append_observation(_rec("c", "HYP-sdl", ObservationType.REFUTED, reviewed=True), p)
append_observation(_rec("d", "HYP-sdl", ObservationType.CONFIRMED, reviewed=False), p) # unreviewed -> ignored
stats = {s.hypothesis_id: s for s in aggregate_by_hypothesis(load_observations(p))}
s = stats["HYP-sdl"]
assert s.total_count == 4 and s.reviewed_count == 3
assert s.distribution["confirmed"] == 2 and s.distribution["refuted"] == 1 # unreviewed one excluded
assert s.confidence == round(2 / 3, 2) # (2 + 0.5*0) / 3
def test_review_queue_lists_unreviewed(tmp_path):
p = str(tmp_path / "obs.jsonl")
append_observation(_rec("a", "HYP-y", ObservationType.CONFIRMED, reviewed=True), p)
append_observation(_rec("b", "HYP-y", ObservationType.PARTIAL, reviewed=False), p)
q = review_queue(load_observations(p))
assert [r.observation_id for r in q] == ["b"]
def test_load_directory_of_monthly_files(tmp_path):
d = tmp_path / "observations"
d.mkdir()
append_observation(_rec("a", "HYP-z", ObservationType.CONFIRMED, reviewed=True), str(d / "2026-06.jsonl"))
append_observation(_rec("b", "HYP-z", ObservationType.REFUTED, reviewed=True), str(d / "2026-07.jsonl"))
recs = load_observations(str(d))
assert {r.observation_id for r in recs} == {"a", "b"}
@@ -73,6 +73,17 @@ def test_partial_signal_surfaces_as_indication_and_is_still_asked():
assert "secure_development_lifecycle" in asked or "secure_development_lifecycle" in d["capability_delta"] assert "secure_development_lifecycle" in asked or "secure_development_lifecycle" in d["capability_delta"]
def test_questions_carry_curated_text_and_human_labels():
# the curated why_asked from the transition pattern must reach the question (not the generic
# fallback "Keine Anhaltspunkte ... klären"), and surfaced capabilities get human labels.
body = dict(_BODY, certifications=["ISO27001"], target="TISAX", scanner_findings=[])
r = _client.post("/onboarding/advisor-start", json=body)
assert r.status_code == 200, r.text
d = r.json()
assert any("Keine Anhaltspunkte" not in q["why"] for q in d["top_5_questions"]) # real expert text surfaced
assert d["capability_labels"].get("vda_isa_self_assessment") == "VDA-ISA-Selbstauskunft"
def test_unknown_target_is_404(): def test_unknown_target_is_404():
body = dict(_BODY, target="NOPE") body = dict(_BODY, target="NOPE")
r = _client.post("/onboarding/advisor-start", json=body) r = _client.post("/onboarding/advisor-start", json=body)