Extend advisor proof with procedure→evidence chain

Vollständige Begründungskette aus der Registry: Rechtsgrundlage → Obligation → Procedure
→ Controls → Evidence → Antwort. Join cra.json × cra_procedures.json, deterministisch, kein LLM.

SBOM-Beweis: 7 Pflichten je mit CRA-Rechtsgrundlage + Procedure (wie umgesetzt) + Controls
(Prüfung) + aggregierte Required Evidence; 4 Best-Practice (Guidance OWASP/NIST/ENISA);
Beziehung sbom_*→supports→vuln_identification; citation 7/7 pending_span_anchor.

Der Unterschied zu RAG sichtbar: RAG beantwortet — BreakPilot begründet UND operationalisiert.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-25 09:44:27 +02:00
parent 67dba5f641
commit e5cce9caff
+46 -33
View File
@@ -1,10 +1,11 @@
"""P3 — Compliance-Advisor-Proof: obligation-basierte Antwort aus der Registry (NICHT RAG-Text, """P3 — Compliance-Advisor-Proof: obligation-basierte Antwort als vollstaendige
KEIN LLM). Demonstriert den besseren Antworttyp: PFLICHT (LEGAL_MINIMUM + Rechtsgrundlage + BEGRUENDUNGSKETTE aus der Registry (NICHT RAG-Text, KEIN LLM):
Applicability) / BEST PRACTICE (guidance_basis) / NACHWEISE (evidence_facets + member controls) Rechtsgrundlage -> Obligation -> Procedure -> Controls -> Evidence -> Antwort.
+ Beziehungen. Deterministisch + zitierfähig — das ist der Produktnutzen der Registry. Deterministisch + zitierfaehig. Der Unterschied zu RAG: RAG beantwortet — BreakPilot
begruendet UND operationalisiert.
python3 scripts/obligation_discovery/advisor_proof.py --registry obligations/cra.json \ python3 scripts/obligation_discovery/advisor_proof.py --registry obligations/cra.json \
--topic sbom --has-digital-elements --procedures obligations/cra_procedures.json --topic sbom --has-digital-elements
""" """
from __future__ import annotations from __future__ import annotations
@@ -17,7 +18,7 @@ def applies(obl: dict, has_digital: bool) -> tuple[bool, str]:
if a == "universal": if a == "universal":
return True, "" return True, ""
if a.startswith("domain:products_with_digital_elements"): if a.startswith("domain:products_with_digital_elements"):
return (has_digital, "nur für Produkte mit digitalen Elementen (CRA Art. 3)") return has_digital, "nur fuer Produkte mit digitalen Elementen (CRA Art. 3)"
if a.startswith("domain:"): if a.startswith("domain:"):
return True, a.split(":", 1)[1] return True, a.split(":", 1)[1]
if a.startswith("conditional:"): if a.startswith("conditional:"):
@@ -28,49 +29,61 @@ def applies(obl: dict, has_digital: bool) -> tuple[bool, str]:
def main() -> None: def main() -> None:
ap = argparse.ArgumentParser() ap = argparse.ArgumentParser()
ap.add_argument("--registry", required=True) ap.add_argument("--registry", required=True)
ap.add_argument("--topic", default="sbom", help="Family/Keyword der Pflicht (z.B. sbom)") ap.add_argument("--procedures", required=True)
ap.add_argument("--topic", default="sbom")
ap.add_argument("--has-digital-elements", action="store_true") ap.add_argument("--has-digital-elements", action="store_true")
ap.add_argument("--question", default="Muss ich als Maschinenbauer eine SBOM bereitstellen?") ap.add_argument("--question", default="Muss ich als Maschinenbauer eine SBOM bereitstellen?")
a = ap.parse_args() a = ap.parse_args()
reg = json.load(open(a.registry, encoding="utf-8")) reg = json.load(open(a.registry, encoding="utf-8"))
procs = json.load(open(a.procedures, encoding="utf-8"))["procedures"]
obls = [o for o in reg["obligations"] obls = [o for o in reg["obligations"]
if a.topic in o.get("family", "") or a.topic in o["id"] or a.topic in o.get("name", "").lower()] if a.topic in o.get("family", "") or a.topic in o["id"]]
ids = {o["id"] for o in obls} ids = {o["id"] for o in obls}
pflicht = [o for o in obls if o["tier"] == "LEGAL_MINIMUM"] by_obl: dict[str, list] = {}
best = [o for o in obls if o["tier"] in ("BEST_PRACTICE", "IMPLEMENTATION_GUIDANCE")] for p in procs:
for oid in p.get("fulfills_obligations", []):
by_obl.setdefault(oid, []).append(p)
print(f"FRAGE: {a.question}") pflicht = [o for o in obls if o["tier"] == "LEGAL_MINIMUM" and applies(o, a.has_digital_elements)[0]]
print(f"KONTEXT: Hersteller; digitale Elemente = {a.has_digital_elements} → CRA-Geltungsbereich") best = [o for o in obls if o["tier"] != "LEGAL_MINIMUM"]
applicable_pflicht = [(o, applies(o, a.has_digital_elements)) for o in pflicht]
any_pflicht = any(ok for _, (ok, _) in applicable_pflicht)
n_pflicht = sum(1 for _, (ok, _) in applicable_pflicht if ok)
verdict = "JA" if any_pflicht and a.has_digital_elements else "NUR WENN CRA-anwendbar"
print(f"\nANTWORT: {verdict} — unter dem CRA bestehen {n_pflicht} gesetzliche "
f"Mindestpflichten zum Thema {a.topic}.\n")
print("── PFLICHT (LEGAL_MINIMUM, Wortlaut im CRA) ──") print(f"FRAGE: {a.question}")
for o, (ok, note) in applicable_pflicht: print(f"\nANTWORT: {'JA' if pflicht and a.has_digital_elements else 'NUR WENN CRA-anwendbar'}"
if not ok: f"sofern das Produkt unter den CRA faellt (product with digital elements, Art. 3).")
continue print("\n══ BEGRUENDUNGSKETTE (Recht → Obligation → Procedure → Controls → Evidence) ══")
req_evidence: list[str] = []
for o in pflicht:
lb = "; ".join(f"{b.get('source','')} {b.get('anchor','')}".strip() for b in o.get("legal_basis", [])) lb = "; ".join(f"{b.get('source','')} {b.get('anchor','')}".strip() for b in o.get("legal_basis", []))
print(f" {o['id']}{o.get('description','')[:84]}") print(f"\n● PFLICHT: {o['id']}{o.get('description','')[:80]}")
print(f" Rechtsgrundlage: {lb or ''}" + (f" | {note}" if note else "")) print(f" Rechtsgrundlage: {lb or ''}")
ps = by_obl.get(o["id"], [])
for p in ps:
print(f" Procedure (wie umgesetzt): {p['procedure_id']} — Schritte: {len(p.get('steps',[]))}")
print(f" Controls (Pruefung): {' · '.join(p.get('controls', []))[:96]}")
print(f" Nachweis: {' · '.join(p.get('evidence', []))}")
req_evidence += p.get("evidence", [])
if not ps:
print(" Procedure: (noch keine modelliert)")
print("\n── REQUIRED EVIDENCE (aggregiert, womit wird es nachgewiesen) ──")
print(" " + " · ".join(dict.fromkeys(req_evidence)) if req_evidence else "")
print("\n── BEST PRACTICE (anerkannte Umsetzung, KEINE CRA-Wortlautpflicht) ──") print("\n── BEST PRACTICE (anerkannte Umsetzung, KEINE CRA-Wortlautpflicht) ──")
for o in best: for o in best:
gb = "; ".join(b.get("source", "") for b in o.get("guidance_basis", [])) gb = "; ".join(b.get("source", "") for b in o.get("guidance_basis", []))
print(f"{o['id']}{o.get('description','')[:74]} | Guidance: {gb or ''}") print(f" {o['id']}{o.get('description','')[:64]} | Guidance: {gb or ''}")
print("\n── NACHWEISE (Evidence) ──") print("\n── BEZIEHUNG (warum es zaehlt) ──")
for o in pflicht + best:
f = o.get("evidence_facets", {})
facets = "/".join(k for k in ("governance", "capability", "evidence") if f.get(k)) or ""
print(f"{o['id']}: {o.get('member_count','?')} Controls · Facetten {facets}")
print("\n── BEZIEHUNGEN (warum es zählt) ──")
for r in reg.get("relationships", []): for r in reg.get("relationships", []):
if r.get("from") in ids and r.get("to") not in ids: if r.get("from") in ids and r.get("to") not in ids:
print(f"{r['from']} --{r['type']}--> {r['to']}: {r.get('note','')[:70]}") print(f" {r['from']} --{r['type']}--> {r['to']}: {r.get('note','')[:64]}")
pend = sum(1 for o in pflicht if o.get("citation_status") == "pending_span_anchor")
print(f"\n── CITATION ──\n {pend}/{len(pflicht)} Pflichten: pending_span_anchor "
f"(Textstellen-Anker folgen mit dem zitierfaehigen Re-Ingest)")
print("\n(RAG beantwortet — BreakPilot begruendet UND operationalisiert.)")
if __name__ == "__main__": if __name__ == "__main__":