Compare commits

...

34 Commits

Author SHA1 Message Date
Benjamin Admin 2301fb2122 feat(ucca): adopt obligation_id + harden join to semantic (step 3 core)
The Obligation Registry filled proposed_obligation_id (7/7) + cut the logging
family (obligations 47->66). Adopted obligation_id onto our 7 accepted CRA->OWASP
mappings; the join now prefers the EXACT obligation_id over the coarse
citation_unit (which stays as fallback for not-yet-adopted rows).

Effect: semantic coverage 2->4 (user_authentication_required,
credential_confidentiality_protection, auth_key_management,
event_logging_security_events). Befund 1 resolved: V11.2.1 crypto now sits under
credential_confidentiality_protection, not user_authentication_required.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 12:18:34 +02:00
Benjamin Admin 4aa6aa9812 Merge remote-tracking branch 'origin/main' into feat/advisor-status 2026-06-25 12:04:47 +02:00
Benjamin Admin a53d67a35a feat(bridge): logging/audit obligation cut (CRA Annex I (2)(k)) + 7/7 control mapping
- obligations/cra_logging.json: 19 obligations (6 LEGAL_MINIMUM auf (2)(k) korrekt
  verankert, 13 BEST_PRACTICE), 13 Beziehungen; out_of_scope M8/M5/M81 (AI-Act/FRT/PIN).
  Two-stage clustering (2601->1361 micro->100 review-units) -> Opus-Synthese -> Kuration.
- controls_for_obligation_mapping.json: V16.1.1/V16.3.3/V16.3.4 -> event_logging_security_events
  (Umbrella-LM; spezifische Alternativen via ASVS-Control-Text). Jetzt 7/7 gefuellt.
- obligation_join_keys.json: 47->66 obligation_ids (logging family).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 11:57:16 +02:00
Benjamin Admin 3259984d1c Fill semantic control->obligation_id (4/7; V16 pending logging cut)
V6.x->user_authentication_required, V11.2.1->credential_confidentiality_protection,
V11.7.1->auth_key_management; semantisch (NICHT CRA-Anker, die sind approximativ).
V16.x pending bis Logging-Cut. anchor_quality_note dokumentiert.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 11:44:56 +02:00
Benjamin Admin 5e3ed4071b Merge remote-tracking branch 'origin/main' into feat/obligation-aggregation 2026-06-25 11:41:00 +02:00
Benjamin Admin c090617afd Add logging scope to precluster (logging cut)
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 11:40:59 +02:00
Benjamin Admin c5ecfa8f6c feat(bridge): export 7 accepted CRA->OWASP controls for obligation_id proposal
CI / detect-changes (push) Successful in 5s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Successful in 9s
CI / validate-canonical-controls (push) Successful in 5s
CI / loc-budget (push) Successful in 23s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Has been skipped
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
obligations/controls_for_obligation_mapping.json — the Compliance Execution
Graph's accepted controls (V6 auth / V11 crypto / V16 logging) handed to the
Obligation Registry to propose the SEMANTIC control->obligation_id, replacing
the coarse citation_unit interim join (Befund 1). Registry fills
proposed_obligation_id; we then adopt it into control_mapping.obligation_id.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 11:36:57 +02:00
Benjamin Admin 417bcda68c feat(ucca): Advisor obligation-status Durchstich (step 3 complete)
AssessObligationStatus traverses obligation_id -> (citation_unit) -> accepted
controls -> required evidence -> status (erfuellt|offen|unklar). Evidence
presence is a callback; MVP passes nil (nothing collected yet) -> offen.
citation_spans = "pending" until the Legal-Knowledge-Graph session attaches
them. This is the vertical slice that makes the graph a product feature:
"CRA obligation fulfilled because evidence X/Y/Z is present", not "a doc exists".

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 11:15:57 +02:00
Benjamin Admin 86d1473a6a feat(ucca): obligation-join loader + citation_unit bridge + coverage report
Consumes the cross-session contract obligations/obligation_join_keys.json (47
obligation_ids). Interim bridge = citation_unit (our source_norm <-> registry
citation_units), to be hardened to the stable obligation_id (field now optional
on ControlMapping).

ComputeObligationCoverage joins the 47 registry obligations to our accepted
control mappings: covered=2 (user_authentication_required, firmware_software_
authentication), mapped_rejected=3 ((2)(e) -> our OWASP mappings rejected,
route via NIST/BSI), uncovered=42. This coverage signal is the feedback to the
Obligation session for what to cut/refine next.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 11:10:53 +02:00
Benjamin Admin 9e0a9ccef4 Add obligation_id join-key contract (cross-session bridge)
CI / detect-changes (push) Successful in 5s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Successful in 8s
CI / validate-canonical-controls (push) Successful in 7s
CI / loc-budget (push) Successful in 19s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Has been skipped
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
Macht meine Seite des Cross-Session-Vertrags konkret: obligation_id ist der stabile Join-Key
zwischen Legal Knowledge Graph (citation_spans -> obligation_id) und Compliance Execution Graph
(control_mapping.source_norm -> obligation_id). Export aller 47 obligation_ids (CRA: 11 sbom +
7 vuln + 29 auth) mit citation_units als Interim-Brücke. Disziplin: obligation_id nie neu
vergeben (re-link, Pendant zu span_id/control_uuid).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 10:29:29 +02:00
Benjamin Admin 7e1c3668bf Merge remote-tracking branch 'origin/main' into feat/obligation-aggregation
CI / detect-changes (push) Successful in 8s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Successful in 5s
CI / validate-canonical-controls (push) Successful in 4s
CI / loc-budget (push) Successful in 14s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Has been skipped
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
2026-06-25 10:15:25 +02:00
Benjamin Admin ab3cb86b1c feat(ucca): Evidence-Requirement model (step A)
CI / detect-changes (push) Successful in 5s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Successful in 7s
CI / validate-canonical-controls (push) Successful in 5s
CI / loc-budget (push) Successful in 20s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Successful in 1m5s
CI / iace-gt-coverage (push) Successful in 17s
CI / test-python-backend (push) Has been skipped
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
The last edge of the compliance graph: what concrete, fresh evidence proves a
framework control is met (config_export/test_report/sbom/audit_log/pentest/...
from github/ci/scanner/manual_upload, with a freshness requirement).

Seeded for all 7 accepted CRA->OWASP controls (Auth/Crypto/Logging). A graph
test enforces connectivity: every accepted control must carry >=1 required
evidence — no dangling node in Obligation -> Control -> Evidence.

This is what will let the Advisor state "the CRA requirement is fulfilled" from
present evidence, not from the mere existence of a document.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 10:06:09 +02:00
Benjamin Admin 0db0e9a129 feat(ucca): curate CRA Annex I -> OWASP mappings (review B)
7 accepted, 13 rejected (reviewed_by=benjamin, 2026-06-25). The accepted set is
the first audited ground truth of the compliance graph:
  (2c) Zugriff   -> V6.3.1, V6.1.1   (Auth)
  (2d) Crypto    -> V11.2.1, V11.7.1 (corrected from the retriever's wrong V14)
  (2k) Logging   -> V16.3.3, V16.3.4, V16.1.1

Rejected stay as audit trail. (2e) integrity, (2l) updates, (2i) attack surface
rejected with reason "OWASP ASVS not the right target standard, map via NIST/BSI"
— architectural proof for the multi-framework framework_* layer.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 10:01:06 +02:00
Benjamin Admin 53ea388ea0 refactor(ucca): control-mapping model per review feedback
- DROP confidence from the persisted mapping: a curated mapping is a
  professional statement, not an AI guess (retriever score -> rationale only).
- ADD mapping_status (candidate|accepted|rejected|superseded) — the review state.
- ADD audit trail (reviewed_by/review_date/review_reason); accepted/rejected
  fail-closed without it.
- EXTEND mapping_type: + implements, + contradicts.
- Advisor truth = mapping_status=accepted (acceptedOnly filter).
- migrate the 18 CRA->OWASP rows to mapping_status=candidate.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 09:50:37 +02:00
Benjamin Admin e5cce9caff Extend advisor proof with procedure→evidence chain
Vollständige Begründungskette aus der Registry: Rechtsgrundlage → Obligation → Procedure
→ Controls → Evidence → Antwort. Join cra.json × cra_procedures.json, deterministisch, kein LLM.

SBOM-Beweis: 7 Pflichten je mit CRA-Rechtsgrundlage + Procedure (wie umgesetzt) + Controls
(Prüfung) + aggregierte Required Evidence; 4 Best-Practice (Guidance OWASP/NIST/ENISA);
Beziehung sbom_*→supports→vuln_identification; citation 7/7 pending_span_anchor.

Der Unterschied zu RAG sichtbar: RAG beantwortet — BreakPilot begründet UND operationalisiert.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 09:44:27 +02:00
Benjamin Admin 2f3c98fbe0 feat(ucca): first CRA Annex I -> OWASP retriever candidates (step 3)
18 retriever_candidate mappings generated via the sdk-dev control-intent
retriever. All marked retriever_candidate (NOT curated truth) — the review
step turns the good ones into human_curated.

Empirical validation of the A-decision: the retriever proposes, but produces
wrong candidates (e.g. encryption -> V14 Config instead of V11 Crypto;
V14.2.4 over-appears) that only human review catches. Review notes inline.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 09:36:53 +02:00
Benjamin Admin d987e4fde6 feat(ucca): persisted Control-Mapping data model (Obligation -> framework control)
Versioned JSONL store + Go model for Regulation->Control mappings, per the
A-decision: the retriever only PROPOSES candidates; the curated mapping is the
audited truth the Advisor uses at runtime, never re-invented per query.

- ControlMapping struct (source_norm/source_role/target_framework/target_control/
  mapping_type/confidence/provenance/rationale/version)
- enum validation (rule layer), fail-closed loader, forward+reverse index,
  curated-only filter (IsCurated)
- seed: 2 retriever_candidate rows CRA Annex I -> OWASP ASVS (not yet curated)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 09:32:15 +02:00
Benjamin Admin 67dba5f641 Add CRA procedure model (SBOM + Vuln)
Schließt die Lücke Obligation→Procedure→Control→Evidence (Schritt 3, Compliance-OS-Ebene).
Procedure = Umsetzungs-/Nachweisebene EINER Obligation, KEINE neue Pflicht (LEGAL_MINIMUM
bleibt an der Obligation; Procedure beschreibt Umsetzung; Evidence belegt sie).

- 11 Procedures (5 SBOM + 6 Vuln), 2 Worked Examples; source_role=procedural_requirement
  (Konvergenz mit der Legal-Knowledge-Engine der anderen Session)
- fulfills_obligations[] referenziert die cra.json-Obligations (alle gültig, volle Abdeckung)
- steps/controls/evidence je Procedure; KEINE tier/legal_basis-Felder (kein Pflicht-Duplikat)
- citation_spans: [] / pending_span_anchor (Join folgt mit dem zitierfähigen Re-Ingest)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 09:28:40 +02:00
Benjamin Admin a3053c3c86 docs(architecture): RAG retrieval engine architecture set (01-09)
CI / detect-changes (push) Successful in 14s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Successful in 9s
CI / validate-canonical-controls (push) Successful in 19s
CI / loc-budget (push) Successful in 23s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Has been skipped
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
9 docs + index in docs-src/architecture/ documenting the deterministic
retrieval engine: retrieval pipeline, authority rerank, source_class,
source_role, control-intent + diversity, assessment, confidence,
explainability + supersede, framework_* layer. Each doc carries the exact
constants, the rationale behind them, code refs, and the failure class
it addresses. Audit/onboarding reference.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 09:25:22 +02:00
Benjamin Admin db2fd9d8e9 Add obligation advisor proof (P3)
Demonstriert den Produktnutzen der Registry: obligation-basierte Antwort statt RAG-Text.
Frage → Pflicht (LEGAL_MINIMUM + Rechtsgrundlage + Applicability) ⊥ Best Practice
(guidance_basis) ⊥ Nachweise (evidence_facets + member controls) + Beziehungen, deterministisch
aus obligations/cra.json (kein LLM, zitierfähig).

Beleg (SBOM, Maschinenbauer): JA — 7 CRA-Mindestpflichten + 4 Best-Practice (OWASP/NIST/ENISA);
sbom_* supports vuln_identification_inventory.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 09:06:34 +02:00
Benjamin Admin d21e1247c9 Merge remote-tracking branch 'origin/main' into feat/obligation-aggregation
CI / detect-changes (push) Successful in 5s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Successful in 4s
CI / validate-canonical-controls (push) Successful in 3s
CI / loc-budget (push) Successful in 17s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 25s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
2026-06-25 07:49:16 +02:00
Benjamin Admin e1b270c36e Add obligation discovery pipeline tooling
Sichert die validierte Obligation Discovery Pipeline aus /tmp als dauerhaftes,
committetes Tooling (scripts/obligation_discovery/) — der eigentliche Vermögenswert.

Stufen: precluster (Embedding-Cache + Mikro-Cluster) → meta_cluster (Review Units,
Skalierungs-Fix) → synthesize_obligations (Opus, Key aus ENV, Streaming, harte Tier-Regel,
Provenance) → validate_registry → merge_review_diff. Reine Helfer in _core.py, 16 Unit-Tests.

Doku docs-src/development/obligation_discovery_pipeline_v1.md mit Meilensteinen
(SBOM/Vuln reproduziert, Auth 4408→170 Review Units→54→kuriert 29) und der Architekturregel:
Runtime deterministisch, Discovery LLM-gestützt.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 07:41:45 +02:00
Benjamin Admin 48e39423e6 Add curated CRA authentication obligations (scaling test)
Erster großer Skalierungstest der Registry-Pipeline mit Zwei-Stufen-Clustering:
4408 Controls → 2134 Mikro → 170 Review Units → Opus-Synthese 54 → Kuration 29.

- Zwei-Stufen-Clustering (Mikro→Meta/Review-Unit) ist der Skalierungs-Fix für große Domänen
- harte Tier-Regel generalisiert: nur 6 LEGAL_MINIMUM (CRA fordert nur High-Level-Auth),
  23 BEST_PRACTICE; MFA/Passwort/Session/Krypto = guidance_basis, kein CRA-Primärrecht
- Kuration (key-frei, regelbasiert): Krypto-Mikro→guidance · Prüf/Nachweis→evidence-Facette ·
  Mechanismus-Familien behalten · eID/PSD2→out_of_scope; 6 LM unangetastet
- Provenance pro Obligation (source_meta_cluster/confidence/model/version)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 07:30:55 +02:00
Benjamin Admin 31222885b3 feat(ai-sdk): control-intent result diversity + standard-name classifier override
CI / detect-changes (push) Successful in 7s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Successful in 5s
CI / validate-canonical-controls (push) Successful in 8s
CI / loc-budget (push) Successful in 19s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Successful in 58s
CI / iace-gt-coverage (push) Successful in 17s
CI / test-python-backend (push) Has been skipped
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
On an implementation question impl_guidance (ENISA) keeps its earned semantic
Top-1, but the top-K now surfaces the best operational_requirement and
control_standard from the pool (ensureControlDiversity) — so different source
roles are visible instead of one role flooding the list, without forcing the
binding sources to Top-1.

A recognised standard NAME (NIST/OWASP/ISO 27001/CIS/CSA CCM/Grundschutz) now
overrides a mis-applied supervisory_guidance source_class in classifyAuthority,
so those standards classify and rank as technical_standard (control_standard
role). The corpus tags many standards as guidance (weight 70); the name wins.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 01:54:36 +02:00
Benjamin Admin 188bb787d2 Add proposed CRA obligation relationships
11 human-reasoned Beziehungskanten in cra.json gemerged (dedupliziert gegen die
Pipeline-Kanten), getaggt review_status=proposed / source=human_reasoned_preview /
confidence=high. Nur die kleine Sprache depends_on / supports / produces_evidence_for;
gerichtet. Cross-Family SBOM→Vuln-Kanten erlauben dem Advisor Ursachen-/Wirkungsketten.

Damit ist der CRA-v1-Baustein vollständig: Obligations · legal_basis · guidance_basis ·
out_of_scope · relationships · pending citation anchors.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 00:08:47 +02:00
Benjamin Admin d9d04deb00 feat(ai-sdk): close the 4 GT #3 recall gaps — backflow, cut, restart, spray-arm
CI / detect-changes (push) Successful in 6s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Successful in 5s
CI / validate-canonical-controls (push) Successful in 4s
CI / loc-budget (push) Successful in 17s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Successful in 58s
CI / iace-gt-coverage (push) Successful in 14s
CI / test-python-backend (push) Has been skipped
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
Phase 1 complete. GT #3 recall 84% -> 100% (25/25 matched), no regression:
- HP2207 backflow / potable-water contamination (EN 1717) + measure M2209
  (Rueckflussverhinderer / Systemtrenner) — the only genuinely new hazard.
- HP2208 cut on sharp edges/screens (new sharp_edge tag from scharfe-Kante/Sieb).
- HP2209 unexpected restart during maintenance (dedicated dom_warewashing pattern;
  avoids flooding the log via the broad moving_part tag).
- Spray-arm contact now covered by the enclosure-re-scoped contact patterns.

Kistenhub 97.1% and Bremse pinned mappings unchanged; 0/28 hazards without a
measure. Completes the commercial-dishwasher (white-goods Phase 1) coverage.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 00:05:30 +02:00
Benjamin Admin 2645b5b043 Add draft CRA obligation registry
Erstes belastbares Registry-Artefakt (obligation_registry_v1) aus den validierten
SBOM+Vuln-Candidates der Obligation Discovery Pipeline.

- 18 Obligations (11 SBOM + 7 Vuln)
- 14 LEGAL_MINIMUM, alle mit legal_basis (harte Tier-Regel)
- 4 BEST_PRACTICE korrekt herabgestuft (source_role GUIDANCE/IMPLEMENTATION)
- 70 OUT_OF_SCOPE-Cluster getrennt; member_controls vollständig
- legal_basis (CRA-Primärrecht) ⊥ guidance_basis (BSI/ENISA/NIST/...)
- citation_status=pending_span_anchor (span_id folgt mit Asset 2), review_status=draft

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-24 23:52:20 +02:00
Benjamin Admin 6b7950f428 Freeze Obligation Registry v1 spec (citability + two-graph)
Schreibt das Zielmodell fest: Legal Obligation = gemeinsame Sprache zwischen
Legal Knowledge Graph (Chat) und Compliance Execution Graph (Engine).

- Registry-Schema v1 (id/tier/legal_basis/guidance_basis/facets/citation_anchor_ids/
  relationships/decision_method)
- Zitierfähigkeit hängt an der OBLIGATION, nicht an Controls (Regulierungsänderung =
  Anchor tauschen, Controls unverändert)
- legal_basis (Primärrecht) vs guidance_basis (NIST/OWASP/...) + source_role
  (LEGAL_BASIS/GUIDANCE/EVIDENCE/IMPLEMENTATION/OUT_OF_SCOPE)
- HARTE Regel: LEGAL_MINIMUM nur mit Primärrechts-Anker
- Beziehungsgraph: requires/implements/supports/produces_evidence_for/depends_on/derived_from
- Citation-Anchor-Pipeline Document→Obligation (KEIN Re-Ingest zum Control-Neubau)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-24 23:33:29 +02:00
Benjamin Admin ba6f1bd1f6 Document obligation aggregation validation results
Hält den bewiesenen Shadow-Stand fest: vier Schichten (Obligation Aggregation,
Applicability, Recall-limited Segregation, Targeted LLM Fix) + Zahlen.

- 7-Firmen-Shadow: 136 legacy control-findings → 29 obligation findings = 4,7×
  (23 echte Lücken, 6 recall_limited in nur 2/7 Firmen, 46 MET, 2 N/A)
- LLM-Fix validiert: teamviewer 5→0, safetykon 7→4 (echte Portability-Lücke bleibt,
  legitimate_interest→NA); recall_limited 3→0 bei beiden
- Modell: Haiku 4.5 (fest verdrahteter Sufficiency-Judge), NICHT OVH-Kaskade/Opus
  → Deploy-Gate ist ein gültiger Anthropic-Key auf dev, nicht der OVH-Pfad

Kein Deploy, kein Live-Schalten.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-24 21:39:28 +02:00
Benjamin Admin c1ea9458a7 Add met_count and recall_limited_obligations to shadow telemetry
Reichert die Obligation-Shadow-Telemetrie um zwei Felder an für die Cross-Firmen-
Auswertung: met_count (abgedeckte Obligations) + recall_limited_obligations (welche
Obligations recall-limitiert sind) — erlaubt die Konzentrations-Analyse über Firmen.

7-Firmen-Shadow: 136 Control-Findings → 29 Obligation-Findings (4,7×); recall_limited
nur 6/29, konzentriert auf third_country/safeguards in 2/7 Firmen → LLM-Fix bounded.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-24 20:15:45 +02:00
Benjamin Admin 0631a98bdd Mark recall-limited obligations in DSE shadow telemetry
Trennt im Shadow drei Kategorien statt eines pauschalen FAILED:
  - echte Lücke (failed_by_current_checker)
  - redundanter Control-FP (kollabiert per OR zu MET)
  - Prüfer-Reichweitenproblem (recall_limited)

obligation_taxonomy.py: decision_method_required=LLM für recipients_disclosed,
third_country_transfer_disclosed, safeguards_disclosed, safeguards_accessible
(versioniertes Registry-Artefakt bis DB-Tabelle, v1-Spec). Empirisch: TeamViewer
0/22 kw+emb trotz erfüllter Pflicht (cos 0.49-0.57) → CONTENT/LLM-Klasse, kein Schwellen-Fix.

compute_obligation_shadow segregiert FAILED/PARTIAL über requires_llm(): teamviewer
5 Findings → 2 echte + 3 recall_limited. 9 neue Unit-Tests (41 gesamt grün).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-24 13:46:21 +02:00
Benjamin Admin c3542f7dfe feat(dse): obligation shadow telemetry
Verdrahtet die Obligation Aggregation Engine als Layer 4 (SHADOW) in v3_engine:
erzeugt aus den results zusätzlich Obligation-Ergebnisse AUSSCHLIESSLICH für die
Telemetrie. Greift NICHT in results ein — nutzer-sichtbare Findings unverändert.

- _obligation_shadow.py: fetch_obligation_markers (legal_obligations + applicability)
  + compute_obligation_shadow (pure): legacy_control_findings, obligation_shadow_results,
  collapse_factor, na_count, met_failed_delta, top_collapsed_obligations
- met-Signal = Legacy-passed (kein zusätzlicher Prüfer-Call/Key)

E2E (3 Firmen, echte Engine): 57 Control-Findings → 14 Obligation-Findings (4,1×);
Redundanz kollabiert wo Evidenz existiert, echte Lücken bleiben FAILED. 6 Unit-Tests grün.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-24 12:59:52 +02:00
Benjamin Admin 7ec29999a2 feat(obligation): obligation applicability predicates
Minimaler Applicability-Hook für die Obligation Aggregation Engine: entscheidet
aus dem Dokumenttext, ob eine bedingte Obligation anwendbar ist (True/False/None).

- has_third_country_transfer · uses_legitimate_interest · direct_marketing
  (+ Alias legitimate_interest_or_public_task)
- unbekanntes Prädikat → None → Aufrufer behält Default=anwendbar (fail-safe, nie stille NA)
- profiling/employment/telecom/health/data_act folgen als nächste Charge

Re-Benchmark (Opus-GT, 3 Firmen): Prädikate erkennen Transfer/berecht.Interesse/
Direktwerbung korrekt → keine falsche NA; NA-Flip-Probe bestätigt FEHLT→NA ohne Transfer.
14 Unit-Tests grün.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-24 12:43:42 +02:00
Benjamin Admin 402a42d30d feat(obligation): obligation-level aggregation engine
Erste Ausführung des Legal Obligation Layer v1: aggregiert Bewertungen auf
Kriterium-/Control-Ebene zu Findings auf Obligation-Ebene
(Regulation → Legal Obligation → Control → Criterion).

- regulierungs-agnostisch (obligation_id/tier/met/legal_basis/conditional)
- fail-safe: LM applicable=false→NA · keine erfüllt→FAILED · alle→MET · Teil→PARTIAL;
  BP/OPT covered→MET sonst OPEN (nie FAILED); LM unbewertbar→UNDETERMINED (Legacy behalten)
- Redundanz-Kollaps per OR pro legal_basis-Anforderung → kein künstliches PARTIAL
- Applicability als Hook (Prädikat-Engine folgt separat)

Shadow-Benchmark (Opus-GT, 3 Firmen): 38 Control-Findings → 13 Obligation-Findings
(2,9×); ~23 redundante Falsch-Positive strukturell korrigiert, echte Lücken erhalten,
PARTIAL=0. 16/16 Unit-Tests grün.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-24 12:28:03 +02:00
56 changed files with 20864 additions and 1 deletions
@@ -0,0 +1,24 @@
// Control-Mapping: CRA Annex I -> OWASP ASVS 5.0. Eine Zeile = ein Mapping (Schema: ControlMapping).
// Reviewt 2026-06-25 (benjamin): 7 accepted, 13 rejected. accepted = Audit-Wahrheit (Advisor nutzt acceptedOnly).
// rejected bleiben als Audit-Spur ("warum verworfen"). KEIN confidence — kuratiert = fachliche Feststellung.
// Architekturbeweis: CRA -> OWASP fuer AppSec/Auth/Crypto/Logging; Ops/Update/Attack-Surface/Integritaet -> NIST/BSI.
{"source_norm": "CRA Annex I Part I (2)(c) — Schutz vor unbefugtem Zugriff", "source_role": "operational_requirement", "target_framework": "OWASP ASVS", "target_control": "V6.3.1", "mapping_type": "supports", "mapping_status": "accepted", "provenance": "human_curated", "rationale": "V6 = Authentication.", "reviewed_by": "benjamin", "review_date": "2026-06-25", "review_reason": "V6 = Authentication, sauberer Treffer fuer Zugriffsschutz/Authentisierung.", "version": "2026-06-25", "obligation_id": "user_authentication_required"}
{"source_norm": "CRA Annex I Part I (2)(c) — Schutz vor unbefugtem Zugriff", "source_role": "operational_requirement", "target_framework": "OWASP ASVS", "target_control": "V6.1.1", "mapping_type": "supports", "mapping_status": "accepted", "provenance": "human_curated", "rationale": "V6 = Authentication.", "reviewed_by": "benjamin", "review_date": "2026-06-25", "review_reason": "V6 = Authentication, sauberer Treffer fuer Zugriffsschutz/Authentisierung.", "version": "2026-06-25", "obligation_id": "user_authentication_required"}
{"source_norm": "CRA Annex I Part I (2)(d) — Vertraulichkeit / Verschluesselung", "source_role": "operational_requirement", "target_framework": "OWASP ASVS", "target_control": "V11.2.1", "mapping_type": "supports", "mapping_status": "accepted", "provenance": "human_curated", "rationale": "V11 = Cryptography.", "reviewed_by": "benjamin", "review_date": "2026-06-25", "review_reason": "Korrektur von V14: V11 = Cryptography, richtiger Bereich fuer Verschluesselung.", "version": "2026-06-25", "obligation_id": "credential_confidentiality_protection"}
{"source_norm": "CRA Annex I Part I (2)(d) — Vertraulichkeit / Verschluesselung", "source_role": "operational_requirement", "target_framework": "OWASP ASVS", "target_control": "V11.7.1", "mapping_type": "supports", "mapping_status": "accepted", "provenance": "human_curated", "rationale": "V11.7 = Key Management.", "reviewed_by": "benjamin", "review_date": "2026-06-25", "review_reason": "Korrektur von V14: V11.7 = Key Management fuer Verschluesselung/Schluesselverwaltung.", "version": "2026-06-25", "obligation_id": "auth_key_management"}
{"source_norm": "CRA Annex I Part I (2)(k) — Sicherheitsrelevante Ereignisse / Logging", "source_role": "operational_requirement", "target_framework": "OWASP ASVS", "target_control": "V16.3.3", "mapping_type": "supports", "mapping_status": "accepted", "provenance": "human_curated", "rationale": "V16 = Security Logging.", "reviewed_by": "benjamin", "review_date": "2026-06-25", "review_reason": "V16 = Logging, sauberer Treffer fuer sicherheitsrelevante Ereignisse.", "version": "2026-06-25", "obligation_id": "event_logging_security_events"}
{"source_norm": "CRA Annex I Part I (2)(k) — Sicherheitsrelevante Ereignisse / Logging", "source_role": "operational_requirement", "target_framework": "OWASP ASVS", "target_control": "V16.3.4", "mapping_type": "supports", "mapping_status": "accepted", "provenance": "human_curated", "rationale": "V16 = Security Logging.", "reviewed_by": "benjamin", "review_date": "2026-06-25", "review_reason": "V16 = Logging, sauberer Treffer fuer sicherheitsrelevante Ereignisse.", "version": "2026-06-25", "obligation_id": "event_logging_security_events"}
{"source_norm": "CRA Annex I Part I (2)(k) — Sicherheitsrelevante Ereignisse / Logging", "source_role": "operational_requirement", "target_framework": "OWASP ASVS", "target_control": "V16.1.1", "mapping_type": "supports", "mapping_status": "accepted", "provenance": "human_curated", "rationale": "V16 = Security Logging.", "reviewed_by": "benjamin", "review_date": "2026-06-25", "review_reason": "V16 = Logging, sauberer Treffer fuer sicherheitsrelevante Ereignisse.", "version": "2026-06-25", "obligation_id": "event_logging_security_events"}
{"source_norm": "CRA Annex I Part I (2)(c) — Schutz vor unbefugtem Zugriff", "source_role": "operational_requirement", "target_framework": "OWASP ASVS", "target_control": "V14.2.4", "mapping_type": "related", "mapping_status": "rejected", "provenance": "human_curated", "rationale": "Retriever-Kandidat.", "reviewed_by": "benjamin", "review_date": "2026-06-25", "review_reason": "V14 = Config, kein Auth — verworfen.", "version": "2026-06-25"}
{"source_norm": "CRA Annex I Part I (2)(d) — Vertraulichkeit / Verschluesselung", "source_role": "operational_requirement", "target_framework": "OWASP ASVS", "target_control": "V14.2.4", "mapping_type": "related", "mapping_status": "rejected", "provenance": "human_curated", "rationale": "Retriever-Kandidat.", "reviewed_by": "benjamin", "review_date": "2026-06-25", "review_reason": "V14 = Config, Crypto gehoert zu V11 — verworfen.", "version": "2026-06-25"}
{"source_norm": "CRA Annex I Part I (2)(d) — Vertraulichkeit / Verschluesselung", "source_role": "operational_requirement", "target_framework": "OWASP ASVS", "target_control": "V14.3.2", "mapping_type": "related", "mapping_status": "rejected", "provenance": "human_curated", "rationale": "Retriever-Kandidat.", "reviewed_by": "benjamin", "review_date": "2026-06-25", "review_reason": "V14 = Config, Crypto gehoert zu V11 — verworfen.", "version": "2026-06-25"}
{"source_norm": "CRA Annex I Part I (2)(d) — Vertraulichkeit / Verschluesselung", "source_role": "operational_requirement", "target_framework": "OWASP ASVS", "target_control": "V14.2.3", "mapping_type": "related", "mapping_status": "rejected", "provenance": "human_curated", "rationale": "Retriever-Kandidat.", "reviewed_by": "benjamin", "review_date": "2026-06-25", "review_reason": "V14 = Config, Crypto gehoert zu V11 — verworfen.", "version": "2026-06-25"}
{"source_norm": "CRA Annex I Part I (2)(e) — Integritaet", "source_role": "operational_requirement", "target_framework": "OWASP ASVS", "target_control": "V14.2.4", "mapping_type": "related", "mapping_status": "rejected", "provenance": "human_curated", "rationale": "Retriever-Kandidat.", "reviewed_by": "benjamin", "review_date": "2026-06-25", "review_reason": "OWASP ASVS ist hier nicht der passende Zielstandard. Mapping ueber NIST/BSI erforderlich.", "version": "2026-06-25"}
{"source_norm": "CRA Annex I Part I (2)(e) — Integritaet", "source_role": "operational_requirement", "target_framework": "OWASP ASVS", "target_control": "V1.2.4", "mapping_type": "related", "mapping_status": "rejected", "provenance": "human_curated", "rationale": "Retriever-Kandidat.", "reviewed_by": "benjamin", "review_date": "2026-06-25", "review_reason": "OWASP ASVS ist hier nicht der passende Zielstandard. Mapping ueber NIST/BSI erforderlich.", "version": "2026-06-25"}
{"source_norm": "CRA Annex I Part I (2)(e) — Integritaet", "source_role": "operational_requirement", "target_framework": "OWASP ASVS", "target_control": "V6.1.1", "mapping_type": "related", "mapping_status": "rejected", "provenance": "human_curated", "rationale": "Retriever-Kandidat.", "reviewed_by": "benjamin", "review_date": "2026-06-25", "review_reason": "OWASP ASVS ist hier nicht der passende Zielstandard. Mapping ueber NIST/BSI erforderlich.", "version": "2026-06-25"}
{"source_norm": "CRA Annex I Part I (2)(l) — Sichere Updates", "source_role": "operational_requirement", "target_framework": "OWASP ASVS", "target_control": "V14.2.4", "mapping_type": "related", "mapping_status": "rejected", "provenance": "human_curated", "rationale": "Retriever-Kandidat.", "reviewed_by": "benjamin", "review_date": "2026-06-25", "review_reason": "OWASP ASVS ist hier nicht der passende Zielstandard. Mapping ueber NIST/BSI erforderlich.", "version": "2026-06-25"}
{"source_norm": "CRA Annex I Part I (2)(l) — Sichere Updates", "source_role": "operational_requirement", "target_framework": "OWASP ASVS", "target_control": "V2.4.1", "mapping_type": "related", "mapping_status": "rejected", "provenance": "human_curated", "rationale": "Retriever-Kandidat.", "reviewed_by": "benjamin", "review_date": "2026-06-25", "review_reason": "OWASP ASVS ist hier nicht der passende Zielstandard. Mapping ueber NIST/BSI erforderlich.", "version": "2026-06-25"}
{"source_norm": "CRA Annex I Part I (2)(l) — Sichere Updates", "source_role": "operational_requirement", "target_framework": "OWASP ASVS", "target_control": "V6.1.1", "mapping_type": "related", "mapping_status": "rejected", "provenance": "human_curated", "rationale": "Retriever-Kandidat.", "reviewed_by": "benjamin", "review_date": "2026-06-25", "review_reason": "OWASP ASVS ist hier nicht der passende Zielstandard. Mapping ueber NIST/BSI erforderlich.", "version": "2026-06-25"}
{"source_norm": "CRA Annex I Part I (2)(i) — Angriffsflaeche minimieren", "source_role": "operational_requirement", "target_framework": "OWASP ASVS", "target_control": "V6.1.1", "mapping_type": "related", "mapping_status": "rejected", "provenance": "human_curated", "rationale": "Retriever-Kandidat.", "reviewed_by": "benjamin", "review_date": "2026-06-25", "review_reason": "OWASP ASVS ist hier nicht der passende Zielstandard. Mapping ueber NIST/BSI erforderlich.", "version": "2026-06-25"}
{"source_norm": "CRA Annex I Part I (2)(i) — Angriffsflaeche minimieren", "source_role": "operational_requirement", "target_framework": "OWASP ASVS", "target_control": "V15.3.3", "mapping_type": "related", "mapping_status": "rejected", "provenance": "human_curated", "rationale": "Retriever-Kandidat.", "reviewed_by": "benjamin", "review_date": "2026-06-25", "review_reason": "OWASP ASVS ist hier nicht der passende Zielstandard. Mapping ueber NIST/BSI erforderlich.", "version": "2026-06-25"}
{"source_norm": "CRA Annex I Part I (2)(i) — Angriffsflaeche minimieren", "source_role": "operational_requirement", "target_framework": "OWASP ASVS", "target_control": "V8.2.4", "mapping_type": "related", "mapping_status": "rejected", "provenance": "human_curated", "rationale": "Retriever-Kandidat.", "reviewed_by": "benjamin", "review_date": "2026-06-25", "review_reason": "OWASP ASVS ist hier nicht der passende Zielstandard. Mapping ueber NIST/BSI erforderlich.", "version": "2026-06-25"}
@@ -0,0 +1,16 @@
// Evidence-Requirements je OWASP-ASVS-Control (Schema: EvidenceRequirement). Eine Zeile = eine geforderte Evidenz.
// Autoriert/kuratiert (nicht Retriever). Der Advisor kann eine CRA-Anforderung erst dann als erfuellt melden,
// wenn die required Evidenzen der gemappten, accepted Controls vorliegen + frisch genug sind.
// Stand 2026-06-25, Basis: die 7 accepted CRA->OWASP-Mappings (Auth V6, Crypto V11, Logging V16).
{"framework": "OWASP ASVS", "control": "V6.3.1", "evidence_type": "config_export", "evidence_source": "github", "freshness_requirement": "per_release", "required": true, "rationale": "IAM-/Zugriffskonfiguration als Nachweis der Authentisierungs-Anforderung.", "version": "2026-06-25"}
{"framework": "OWASP ASVS", "control": "V6.3.1", "evidence_type": "test_report", "evidence_source": "ci", "freshness_requirement": "per_release", "required": true, "rationale": "Automatisierter Zugriffstest (CI) belegt funktionierende Zugriffskontrolle.", "version": "2026-06-25"}
{"framework": "OWASP ASVS", "control": "V6.3.1", "evidence_type": "pentest", "evidence_source": "manual_upload", "freshness_requirement": "annually", "required": false, "rationale": "Jaehrlicher PenTest der Authentisierung — vertieft, aber nicht Pflicht je Release.", "version": "2026-06-25"}
{"framework": "OWASP ASVS", "control": "V6.1.1", "evidence_type": "config_export", "evidence_source": "github", "freshness_requirement": "per_release", "required": true, "rationale": "Rollenmodell/Auth-Architektur als Nachweis.", "version": "2026-06-25"}
{"framework": "OWASP ASVS", "control": "V11.2.1", "evidence_type": "config_export", "evidence_source": "github", "freshness_requirement": "per_release", "required": true, "rationale": "Krypto-Konfiguration (zugelassene Algorithmen) als Nachweis der Verschluesselung.", "version": "2026-06-25"}
{"framework": "OWASP ASVS", "control": "V11.2.1", "evidence_type": "sbom", "evidence_source": "ci", "freshness_requirement": "per_release", "required": true, "rationale": "SBOM weist die eingesetzten Krypto-Bibliotheken/-Versionen nach.", "version": "2026-06-25"}
{"framework": "OWASP ASVS", "control": "V11.7.1", "evidence_type": "policy", "evidence_source": "manual_upload", "freshness_requirement": "annually", "required": true, "rationale": "Key-Management-Policy (Rotation, Aufbewahrung) als organisatorischer Nachweis.", "version": "2026-06-25"}
{"framework": "OWASP ASVS", "control": "V11.7.1", "evidence_type": "config_export", "evidence_source": "github", "freshness_requirement": "per_release", "required": true, "rationale": "Konfiguration der Schluesselverwaltung als technischer Nachweis.", "version": "2026-06-25"}
{"framework": "OWASP ASVS", "control": "V16.3.3", "evidence_type": "audit_log", "evidence_source": "ci", "freshness_requirement": "continuous", "required": true, "rationale": "Security-Audit-Logs belegen, dass sicherheitsrelevante Ereignisse protokolliert werden.", "version": "2026-06-25"}
{"framework": "OWASP ASVS", "control": "V16.3.3", "evidence_type": "config_export", "evidence_source": "github", "freshness_requirement": "per_release", "required": true, "rationale": "Logging-Konfiguration als Nachweis der erfassten Ereignisarten.", "version": "2026-06-25"}
{"framework": "OWASP ASVS", "control": "V16.3.4", "evidence_type": "audit_log", "evidence_source": "ci", "freshness_requirement": "continuous", "required": true, "rationale": "Security-Audit-Logs.", "version": "2026-06-25"}
{"framework": "OWASP ASVS", "control": "V16.1.1", "evidence_type": "config_export", "evidence_source": "github", "freshness_requirement": "per_release", "required": true, "rationale": "Logging-Architektur-Konfiguration als Nachweis.", "version": "2026-06-25"}
@@ -128,5 +128,51 @@ func GetWarewashingPatterns() []HazardPattern {
ISO12100Section: "6.3.5.6", ISO12100Section: "6.3.5.6",
DefaultSeverity: 2, DefaultExposure: 3, DefaultSeverity: 2, DefaultExposure: 3,
}, },
{
ID: "HP2207", NameDE: "Rueckfluss / Kontamination des Trinkwassers", NameEN: "Backflow / potable-water contamination",
RequiredComponentTags: []string{"dom_warewashing", "backflow_risk"},
GeneratedHazardCats: []string{"material_environmental"},
SuggestedMeasureIDs: []string{"M2209"},
Priority: 84,
ApplicableLifecycles: []string{"normal_operation"},
ScenarioDE: "Verschmutztes Spuel- oder Chemiewasser wird ueber den Frischwasseranschluss in das Trinkwassernetz zurueckgesaugt und kontaminiert es (Ruecksaugen bei Unterdruck im Netz).",
TriggerDE: "Fehlender oder defekter Rueckflussverhinderer/Systemtrenner; Unterdruck im Trinkwassernetz; kein freier Auslauf.",
HarmDE: "Gesundheitsgefaehrdung Dritter durch kontaminiertes Trinkwasser (Chemie, Keime).",
AffectedDE: "Verbraucher am selben Trinkwassernetz, Betreiber",
ZoneDE: "Frischwasseranschluss, Wasserzulauf",
ISO12100Section: "6.2.4",
DefaultSeverity: 3, DefaultExposure: 2,
},
{
ID: "HP2208", NameDE: "Schnittverletzung an scharfen Kanten/Sieben", NameEN: "Cut injury on sharp edges/screens",
RequiredComponentTags: []string{"dom_warewashing", "sharp_edge"},
GeneratedHazardCats: []string{"mechanical_hazard"},
SuggestedMeasureIDs: []string{"M003"},
Priority: 74,
ApplicableLifecycles: []string{"cleaning", "maintenance"},
ScenarioDE: "Schneiden an scharfen Blechkanten, Sieben oder dem Ablaufpumpen-Laufrad beim Reinigen oder Eingreifen in die Spuelkammer.",
TriggerDE: "Entnehmen/Reinigen der Siebe; Eingreifen an scharfen Kanten ohne Schutzhandschuhe.",
HarmDE: "Schnittwunden an Haenden und Fingern.",
AffectedDE: "Reinigungspersonal, Bedienpersonal",
ZoneDE: "Zugaengliche Kanten, Siebe, Spuelkammer, Ablaufpumpe",
ISO12100Section: "6.2.2.1",
DefaultSeverity: 1, DefaultExposure: 3,
},
{
ID: "HP2209", NameDE: "Unerwarteter Wiederanlauf bei Wartung/Reinigung", NameEN: "Unexpected restart during maintenance/cleaning",
RequiredComponentTags: []string{"dom_warewashing", "programmable"},
RequiredLifecycles: []string{"maintenance", "cleaning", "fault_clearing"},
GeneratedHazardCats: []string{"safety_function_failure"},
SuggestedMeasureIDs: []string{"M042"},
Priority: 80,
ApplicableLifecycles: []string{"maintenance", "cleaning"},
ScenarioDE: "Waehrend Wartung oder Reinigung laeuft die Maschine durch fehlende Freischaltung (LOTO) oder automatischen Wiederanlauf unerwartet an (Pumpe, Spuelgang).",
TriggerDE: "Kein Freischalten/Sichern gegen Wiedereinschalten; automatischer Wiederanlauf nach Netzunterbrechung.",
HarmDE: "Verbruehung, Quetschen oder elektrischer Schlag durch unerwartet anlaufende Maschine.",
AffectedDE: "Wartungspersonal, Reinigungspersonal",
ZoneDE: "Gesamte Maschine, Pumpe, Antriebe",
ISO12100Section: "6.2.11.4",
DefaultSeverity: 3, DefaultExposure: 2,
},
} }
} }
@@ -107,6 +107,9 @@ func GetKeywordDictionary() []KeywordEntry {
// honest (generic hygiene; surfaced by the warewashing GT). // honest (generic hygiene; surfaced by the warewashing GT).
{Keywords: []string{"spuelarm", "spuelfeld", "wascharm", "spruehfeld"}, ExtraTags: []string{"rotating_part"}}, {Keywords: []string{"spuelarm", "spuelfeld", "wascharm", "spruehfeld"}, ExtraTags: []string{"rotating_part"}},
{Keywords: []string{"spuelkammer", "spueltuer", "geraetetuer", "haubentuer", "klapptuer"}, ExtraTags: []string{"access_door"}}, {Keywords: []string{"spuelkammer", "spueltuer", "geraetetuer", "haubentuer", "klapptuer"}, ExtraTags: []string{"access_door"}},
// Frischwasseranschluss an das Trinkwassernetz -> Rueckfluss/Ruecksaug-Risiko (EN 1717).
{Keywords: []string{"rueckfluss", "rueckflussverhinderer", "ruecksaug", "trinkwasser", "frischwasseranschluss", "systemtrenner"}, ExtraTags: []string{"backflow_risk"}},
{Keywords: []string{"scharfe kante", "scharfkant", "blechkante", "scharfe blechkante", "sieb", "siebe"}, ExtraTags: []string{"sharp_edge"}},
// Ghost-Closure (Emit-Seite): macht die 34 toten Required-Tags // Ghost-Closure (Emit-Seite): macht die 34 toten Required-Tags
// emittierbar, jeweils NUR via domaenenspezifische Keywords -> die 120 // emittierbar, jeweils NUR via domaenenspezifische Keywords -> die 120
// Ghost-Patterns feuern wieder, aber nur fuer ihre echte Maschine (kein // Ghost-Patterns feuern wieder, aber nur fuer ihre echte Maschine (kein
@@ -65,5 +65,11 @@ func getWarewashingMeasures() []ProtectiveMeasureEntry {
HazardCategory: "general", HazardCategory: "general",
Examples: []string{"Warnpiktogramm 'Heisser Dampf' an der Tuer", "BA-Hinweis 'Tuer nach Programmende langsam oeffnen'"}, Examples: []string{"Warnpiktogramm 'Heisser Dampf' an der Tuer", "BA-Hinweis 'Tuer nach Programmende langsam oeffnen'"},
NormReferences: []string{"ISO 7010", "EN 60335-2-58"}}, NormReferences: []string{"ISO 7010", "EN 60335-2-58"}},
{ID: "M2209", ReductionType: "design", SubType: "containment",
Name: "Rueckflussverhinderer / Systemtrenner nach EN 1717",
Description: "Der Frischwasseranschluss ist durch einen Rueckflussverhinderer bzw. Systemtrenner der passenden Schutzklasse oder durch einen freien Auslauf gegen Ruecksaugen verschmutzten Wassers in das Trinkwassernetz gesichert.",
HazardCategory: "material_environmental",
Examples: []string{"Systemtrenner Typ BA nach EN 1717", "Freier Auslauf Typ AB ueber dem hoechsten Wasserstand"},
NormReferences: []string{"EN 1717", "EN 60335-2-58"}},
} }
} }
+8 -1
View File
@@ -40,6 +40,14 @@ func classifyAuthority(r LegalSearchResult) authorityInfo {
if jur == "" { if jur == "" {
jur = inferJurisdiction(r) jur = inferJurisdiction(r)
} }
hay := r.ArticleLabel + " " + r.RegulationShort + " " + r.RegulationName + " " + r.RegulationCode
// A recognised standard NAME (NIST/OWASP/ISO 27001/CIS/CSA CCM/Grundschutz) is authoritative
// even when the corpus mis-tagged the chunk as supervisory_guidance (weight 70) — many
// standards were ingested with a generic guidance source_class. The name wins, so they
// classify (and rank) as technical_standard / control_standard. binding_law is preserved.
if r.SourceClass != "binding_law" && containsAny(hay, standardMarkers) {
return authorityInfo{weight: 80, sourceClass: "technical_standard", jurisdiction: jur}
}
if r.SourceClass != "" { if r.SourceClass != "" {
w := r.AuthorityWeight w := r.AuthorityWeight
if w == 0 && r.SourceClass == "binding_law" { if w == 0 && r.SourceClass == "binding_law" {
@@ -50,7 +58,6 @@ func classifyAuthority(r LegalSearchResult) authorityInfo {
if r.AuthorityWeight > 0 { if r.AuthorityWeight > 0 {
return authorityInfo{weight: r.AuthorityWeight, sourceClass: sourceClassFromWeight(r.AuthorityWeight), jurisdiction: jur} return authorityInfo{weight: r.AuthorityWeight, sourceClass: sourceClassFromWeight(r.AuthorityWeight), jurisdiction: jur}
} }
hay := r.ArticleLabel + " " + r.RegulationShort + " " + r.RegulationName + " " + r.RegulationCode
switch { switch {
case containsAny(hay, foreignMarkers): case containsAny(hay, foreignMarkers):
return authorityInfo{weight: 0, sourceClass: "foreign_law", jurisdiction: "CH"} return authorityInfo{weight: 0, sourceClass: "foreign_law", jurisdiction: "CH"}
@@ -15,6 +15,7 @@ func TestClassifyAuthority(t *testing.T) {
{"tagged foreign CH", LegalSearchResult{AuthorityWeight: 0, SourceClass: "foreign_law", Jurisdiction: "CH"}, 0, "foreign_law", "CH"}, {"tagged foreign CH", LegalSearchResult{AuthorityWeight: 0, SourceClass: "foreign_law", Jurisdiction: "CH"}, 0, "foreign_law", "CH"},
{"untagged ENISA guidance", LegalSearchResult{RegulationShort: "ENISA", ArticleLabel: "ENISA CRA Standards Mapping"}, 70, "supervisory_guidance", "EU"}, {"untagged ENISA guidance", LegalSearchResult{RegulationShort: "ENISA", ArticleLabel: "ENISA CRA Standards Mapping"}, 70, "supervisory_guidance", "EU"},
{"untagged NIST standard", LegalSearchResult{RegulationShort: "NIST SP 800-82r3", ArticleLabel: "AU-8"}, 80, "technical_standard", "EU"}, {"untagged NIST standard", LegalSearchResult{RegulationShort: "NIST SP 800-82r3", ArticleLabel: "AU-8"}, 80, "technical_standard", "EU"},
{"mis-tagged NIST guidance -> standard by name", LegalSearchResult{SourceClass: "supervisory_guidance", AuthorityWeight: 70, RegulationShort: "NIST SP 800-82r3", ArticleLabel: "NIST SP 800-82r3"}, 80, "technical_standard", "EU"},
{"BSI Grundschutz standard beats BSI guidance", LegalSearchResult{RegulationShort: "BSI Grundschutz", ArticleLabel: "BSI Grundschutz Baustein"}, 80, "technical_standard", "DE"}, {"BSI Grundschutz standard beats BSI guidance", LegalSearchResult{RegulationShort: "BSI Grundschutz", ArticleLabel: "BSI Grundschutz Baustein"}, 80, "technical_standard", "DE"},
{"weight-only 85 TRGS standard", LegalSearchResult{AuthorityWeight: 85, RegulationShort: "TRGS 529"}, 85, "technical_standard", "EU"}, {"weight-only 85 TRGS standard", LegalSearchResult{AuthorityWeight: 85, RegulationShort: "TRGS 529"}, 85, "technical_standard", "EU"},
{"tagged technical_standard", LegalSearchResult{AuthorityWeight: 80, SourceClass: "technical_standard", Jurisdiction: "EU"}, 80, "technical_standard", "EU"}, {"tagged technical_standard", LegalSearchResult{AuthorityWeight: 80, SourceClass: "technical_standard", Jurisdiction: "EU"}, 80, "technical_standard", "EU"},
@@ -0,0 +1,71 @@
package ucca
// ObligationStatus is the Advisor's vertical slice over the compliance graph for ONE legal
// obligation: which accepted controls satisfy it, what evidence they require, what's missing,
// and the resulting status. The point is "the required evidence is (not) present", not "a
// document exists". citation_spans is pending until the Legal-Knowledge-Graph session attaches
// them to the obligation (the upper half of the bridge).
type ObligationStatus struct {
ObligationID string `json:"obligation_id"`
LegalBasis []string `json:"legal_basis"` // the obligation's citation_units
Status string `json:"status"` // erfuellt | offen | unklar
Controls []ObligationControlStatus `json:"controls"`
CitationSpans string `json:"citation_spans"` // "pending" until the registry fills them
}
// ObligationControlStatus is one control under an obligation with its evidence picture.
type ObligationControlStatus struct {
Framework string `json:"framework"`
Control string `json:"control"`
MappingType string `json:"mapping_type"`
RequiredEvidence []EvidenceRequirement `json:"required_evidence"`
MissingEvidence []EvidenceRequirement `json:"missing_evidence"`
}
// AssessObligationStatus traverses obligation_id -> (citation_unit) -> accepted Controls ->
// required Evidence -> Status. hasEvidence reports whether a given (framework, control,
// evidence_type) is already collected; pass nil in the MVP (no collection yet) -> everything
// required is missing and the status is "offen". Unknown or unmapped obligation -> "unklar".
func AssessObligationStatus(joins *ObligationJoinKeys, mappings *ControlMappingSet, evidence *EvidenceRequirementSet, obligationID string, hasEvidence func(framework, control, evidenceType string) bool) ObligationStatus {
ob := joins.FindObligation(obligationID)
if ob == nil {
return ObligationStatus{ObligationID: obligationID, Status: "unklar", CitationSpans: "pending"}
}
st := ObligationStatus{
ObligationID: obligationID,
LegalBasis: ob.CitationUnits,
CitationSpans: "pending",
Controls: []ObligationControlStatus{},
}
ctrls := AcceptedControlsForObligation(*ob, mappings)
if len(ctrls) == 0 {
st.Status = "unklar" // no accepted control reaches it — we cannot assess
return st
}
anyMissing := false
for _, m := range ctrls {
req := evidence.RequiredFor(m.TargetFramework, m.TargetControl)
missing := make([]EvidenceRequirement, 0, len(req))
for _, e := range req {
if hasEvidence == nil || !hasEvidence(e.Framework, e.Control, e.EvidenceType) {
missing = append(missing, e)
}
}
if len(missing) > 0 {
anyMissing = true
}
st.Controls = append(st.Controls, ObligationControlStatus{
Framework: m.TargetFramework,
Control: m.TargetControl,
MappingType: m.MappingType,
RequiredEvidence: req,
MissingEvidence: missing,
})
}
if anyMissing {
st.Status = "offen"
} else {
st.Status = "erfuellt"
}
return st
}
@@ -0,0 +1,59 @@
package ucca
import "testing"
func loadGraph(t *testing.T) (*ObligationJoinKeys, *ControlMappingSet, *EvidenceRequirementSet) {
t.Helper()
joins, err := LoadObligationJoinKeys("../../../obligations/obligation_join_keys.json")
if err != nil {
t.Fatalf("join keys: %v", err)
}
maps, err := LoadControlMappings("../../data/control_mappings")
if err != nil {
t.Fatalf("mappings: %v", err)
}
ev, err := LoadEvidenceRequirements("../../data/evidence_requirements")
if err != nil {
t.Fatalf("evidence: %v", err)
}
return joins, maps, ev
}
func TestAssessObligationStatus(t *testing.T) {
joins, maps, ev := loadGraph(t)
// covered obligation, no evidence collected yet (MVP) -> offen
st := AssessObligationStatus(joins, maps, ev, "user_authentication_required", nil)
if st.Status != "offen" {
t.Errorf("want offen, got %q", st.Status)
}
if len(st.Controls) == 0 {
t.Fatal("expected controls for a covered obligation")
}
for _, c := range st.Controls {
if len(c.MissingEvidence) != len(c.RequiredEvidence) {
t.Error("MVP: all required evidence should be missing")
}
}
t.Logf("DURCHSTICH user_authentication_required: status=%s legal_basis=%v citation_spans=%s",
st.Status, st.LegalBasis, st.CitationSpans)
for _, c := range st.Controls {
t.Logf(" %s %s (%s): %d required evidence, %d missing", c.Framework, c.Control, c.MappingType, len(c.RequiredEvidence), len(c.MissingEvidence))
}
// all evidence present -> erfuellt
st2 := AssessObligationStatus(joins, maps, ev, "user_authentication_required", func(f, c, et string) bool { return true })
if st2.Status != "erfuellt" {
t.Errorf("want erfuellt with all evidence present, got %q", st2.Status)
}
// uncovered obligation (no accepted control reaches it) -> unklar
if st3 := AssessObligationStatus(joins, maps, ev, "sbom_creation", nil); st3.Status != "unklar" {
t.Errorf("uncovered sbom_creation: want unklar, got %q", st3.Status)
}
// unknown obligation_id -> unklar
if st4 := AssessObligationStatus(joins, maps, ev, "does_not_exist", nil); st4.Status != "unklar" {
t.Errorf("unknown obligation: want unklar, got %q", st4.Status)
}
}
@@ -0,0 +1,152 @@
package ucca
import (
"bufio"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
)
// ControlMapping is one persisted, versioned, REVIEWABLE link from a legal
// obligation/requirement to a concrete framework control — a node in the curated
// compliance graph (Regulation -> Obligation -> Control -> Evidence). The retriever only
// PROPOSES candidates (mapping_status=candidate); a human/rule decision turns the good ones
// into mapping_status=accepted, which is the audited truth the Advisor uses at runtime.
//
// There is intentionally NO probabilistic "confidence" field: once curated, a mapping is a
// professional statement, not an AI guess. The retriever's score lives only in the rationale
// of a candidate, never as structured truth.
type ControlMapping struct {
SourceNorm string `json:"source_norm"` // e.g. "CRA Annex I Part I (2)(c)"
SourceRole string `json:"source_role"` // source_role of the norm (operational_requirement, ...)
TargetFramework string `json:"target_framework"` // e.g. "OWASP ASVS"
TargetControl string `json:"target_control"` // e.g. "V6.3.1"
MappingType string `json:"mapping_type"` // supports | partially_supports | implements | related | contradicts
MappingStatus string `json:"mapping_status"` // candidate | accepted | rejected | superseded
Provenance string `json:"provenance"` // retriever_candidate | human_curated | rule_based
ObligationID string `json:"obligation_id,omitempty"` // stable cross-session join key (Obligation Registry); empty until adopted, citation_unit is the interim bridge
Rationale string `json:"rationale"`
ReviewedBy string `json:"reviewed_by,omitempty"` // who decided (human or rule id)
ReviewDate string `json:"review_date,omitempty"` // YYYY-MM-DD
ReviewReason string `json:"review_reason,omitempty"`
Version string `json:"version"`
}
// Allowed enum values — the deterministic "rule" layer that keeps the curated store clean.
var (
mappingTypeValues = map[string]bool{"supports": true, "partially_supports": true, "implements": true, "related": true, "contradicts": true}
mappingStatusValues = map[string]bool{"candidate": true, "accepted": true, "rejected": true, "superseded": true}
provenanceValues = map[string]bool{"retriever_candidate": true, "human_curated": true, "rule_based": true}
)
// Validate checks required fields + enum membership, and enforces the audit trail: any
// human/rule DECISION (accepted/rejected) must carry who/when/why. Fail-closed at load.
func (m ControlMapping) Validate() error {
switch {
case m.SourceNorm == "":
return fmt.Errorf("control mapping: source_norm required")
case m.TargetFramework == "":
return fmt.Errorf("control mapping: target_framework required")
case m.TargetControl == "":
return fmt.Errorf("control mapping: target_control required")
case !mappingTypeValues[m.MappingType]:
return fmt.Errorf("control mapping: invalid mapping_type %q", m.MappingType)
case !mappingStatusValues[m.MappingStatus]:
return fmt.Errorf("control mapping: invalid mapping_status %q", m.MappingStatus)
case !provenanceValues[m.Provenance]:
return fmt.Errorf("control mapping: invalid provenance %q", m.Provenance)
}
if m.MappingStatus == "accepted" || m.MappingStatus == "rejected" {
if m.ReviewedBy == "" || m.ReviewDate == "" || m.ReviewReason == "" {
return fmt.Errorf("control mapping %s->%s: status %q requires reviewed_by + review_date + review_reason (audit trail)",
m.SourceNorm, m.TargetControl, m.MappingStatus)
}
}
return nil
}
// IsAccepted reports whether this mapping is the active audited truth.
func (m ControlMapping) IsAccepted() bool { return m.MappingStatus == "accepted" }
// ControlMappingSet is the loaded, indexed mapping store (forward + reverse lookup).
type ControlMappingSet struct {
All []ControlMapping
bySourceNorm map[string][]ControlMapping
byControl map[string][]ControlMapping
}
func controlKey(framework, control string) string { return framework + ":" + control }
// ControlsFor returns the controls mapped to a source norm. acceptedOnly restricts to the
// audited truth (what the Advisor may treat as fact).
func (s *ControlMappingSet) ControlsFor(sourceNorm string, acceptedOnly bool) []ControlMapping {
return filterAccepted(s.bySourceNorm[sourceNorm], acceptedOnly)
}
// ObligationsFor returns the norms mapped to a framework control (reverse lookup).
func (s *ControlMappingSet) ObligationsFor(framework, control string, acceptedOnly bool) []ControlMapping {
return filterAccepted(s.byControl[controlKey(framework, control)], acceptedOnly)
}
func filterAccepted(in []ControlMapping, acceptedOnly bool) []ControlMapping {
if !acceptedOnly {
return in
}
out := make([]ControlMapping, 0, len(in))
for _, m := range in {
if m.IsAccepted() {
out = append(out, m)
}
}
return out
}
// LoadControlMappings reads every *.jsonl file under dir (one mapping per line; blank and
// //-prefixed lines ignored), validates each row, and builds the index. An invalid row
// aborts the whole load — fail-closed, because this is the audit truth, not best-effort.
func LoadControlMappings(dir string) (*ControlMappingSet, error) {
files, err := filepath.Glob(filepath.Join(dir, "*.jsonl"))
if err != nil {
return nil, err
}
set := &ControlMappingSet{
bySourceNorm: map[string][]ControlMapping{},
byControl: map[string][]ControlMapping{},
}
for _, f := range files {
fh, err := os.Open(f)
if err != nil {
return nil, err
}
sc := bufio.NewScanner(fh)
sc.Buffer(make([]byte, 0, 64*1024), 1024*1024)
line := 0
for sc.Scan() {
line++
raw := strings.TrimSpace(sc.Text())
if raw == "" || strings.HasPrefix(raw, "//") {
continue
}
var m ControlMapping
if err := json.Unmarshal([]byte(raw), &m); err != nil {
fh.Close()
return nil, fmt.Errorf("%s:%d: %w", f, line, err)
}
if err := m.Validate(); err != nil {
fh.Close()
return nil, fmt.Errorf("%s:%d: %w", f, line, err)
}
set.All = append(set.All, m)
set.bySourceNorm[m.SourceNorm] = append(set.bySourceNorm[m.SourceNorm], m)
k := controlKey(m.TargetFramework, m.TargetControl)
set.byControl[k] = append(set.byControl[k], m)
}
fh.Close()
if err := sc.Err(); err != nil {
return nil, err
}
}
return set, nil
}
@@ -0,0 +1,85 @@
package ucca
import (
"os"
"path/filepath"
"testing"
)
func TestControlMapping_Validate(t *testing.T) {
candidate := ControlMapping{SourceNorm: "CRA Annex I", TargetFramework: "OWASP ASVS", TargetControl: "V6.3.1", MappingType: "supports", MappingStatus: "candidate", Provenance: "retriever_candidate"}
if err := candidate.Validate(); err != nil {
t.Fatalf("valid candidate rejected: %v", err)
}
accepted := ControlMapping{SourceNorm: "A", TargetFramework: "X", TargetControl: "Y", MappingType: "implements", MappingStatus: "accepted", Provenance: "human_curated", ReviewedBy: "benjamin", ReviewDate: "2026-06-25", ReviewReason: "passt"}
if err := accepted.Validate(); err != nil {
t.Fatalf("valid accepted rejected: %v", err)
}
bad := []struct {
name string
m ControlMapping
}{
{"no source_norm", ControlMapping{TargetFramework: "X", TargetControl: "Y", MappingType: "supports", MappingStatus: "candidate", Provenance: "retriever_candidate"}},
{"bad mapping_type", ControlMapping{SourceNorm: "A", TargetFramework: "X", TargetControl: "Y", MappingType: "nope", MappingStatus: "candidate", Provenance: "retriever_candidate"}},
{"bad mapping_status", ControlMapping{SourceNorm: "A", TargetFramework: "X", TargetControl: "Y", MappingType: "supports", MappingStatus: "maybe", Provenance: "retriever_candidate"}},
{"bad provenance", ControlMapping{SourceNorm: "A", TargetFramework: "X", TargetControl: "Y", MappingType: "supports", MappingStatus: "candidate", Provenance: "guessed"}},
{"accepted without audit trail", ControlMapping{SourceNorm: "A", TargetFramework: "X", TargetControl: "Y", MappingType: "supports", MappingStatus: "accepted", Provenance: "human_curated"}},
{"rejected without reason", ControlMapping{SourceNorm: "A", TargetFramework: "X", TargetControl: "Y", MappingType: "supports", MappingStatus: "rejected", Provenance: "human_curated", ReviewedBy: "b", ReviewDate: "2026-06-25"}},
}
for _, tt := range bad {
if err := tt.m.Validate(); err == nil {
t.Errorf("%s: expected rejection", tt.name)
}
}
}
func TestLoadControlMappings(t *testing.T) {
dir := t.TempDir()
content := `// header comment, ignored
{"source_norm":"CRA Annex I","source_role":"operational_requirement","target_framework":"OWASP ASVS","target_control":"V6.3.1","mapping_type":"supports","mapping_status":"accepted","provenance":"human_curated","reviewed_by":"benjamin","review_date":"2026-06-25","review_reason":"V6=Auth passt","rationale":"r","version":"2026-06-25"}
{"source_norm":"CRA Annex I","source_role":"operational_requirement","target_framework":"OWASP ASVS","target_control":"V14.2.4","mapping_type":"related","mapping_status":"candidate","provenance":"retriever_candidate","rationale":"r","version":"2026-06-25"}
`
if err := os.WriteFile(filepath.Join(dir, "m.jsonl"), []byte(content), 0o644); err != nil {
t.Fatal(err)
}
set, err := LoadControlMappings(dir)
if err != nil {
t.Fatalf("load: %v", err)
}
if len(set.All) != 2 {
t.Fatalf("want 2 mappings, got %d", len(set.All))
}
if got := set.ControlsFor("CRA Annex I", false); len(got) != 2 {
t.Errorf("ControlsFor(all): want 2, got %d", len(got))
}
if got := set.ControlsFor("CRA Annex I", true); len(got) != 1 {
t.Errorf("ControlsFor(acceptedOnly): want 1 (only accepted), got %d", len(got))
}
if got := set.ObligationsFor("OWASP ASVS", "V6.3.1", true); len(got) != 1 {
t.Errorf("ObligationsFor accepted reverse lookup: want 1, got %d", len(got))
}
}
func TestLoadControlMappings_RejectsInvalid(t *testing.T) {
dir := t.TempDir()
// accepted without the who/when/why audit trail must fail-closed.
if err := os.WriteFile(filepath.Join(dir, "bad.jsonl"), []byte(`{"source_norm":"A","target_framework":"X","target_control":"Y","mapping_type":"supports","mapping_status":"accepted","provenance":"human_curated","rationale":"r","version":"v"}`), 0o644); err != nil {
t.Fatal(err)
}
if _, err := LoadControlMappings(dir); err == nil {
t.Error("accepted mapping without audit trail must fail the load (fail-closed)")
}
}
func TestControlMappings_SeedFileValid(t *testing.T) {
// The committed seed store must always load + validate.
set, err := LoadControlMappings("../../data/control_mappings")
if err != nil {
t.Fatalf("seed control_mappings failed to load: %v", err)
}
if len(set.All) == 0 {
t.Fatal("seed control_mappings is empty")
}
}
@@ -121,3 +121,54 @@ func controlRoleOf(payload map[string]interface{}) string {
IsRecital: getBool(payload, "is_recital"), IsRecital: getBool(payload, "is_recital"),
}) })
} }
// ensureControlDiversity guarantees that the returned top-K of a control question surfaces at
// least one operational_requirement and one control_standard WHEN the pool contains them —
// without forcing them to Top-1. implementation_guidance (e.g. ENISA good practices) keeps its
// earned semantic lead; the rule only promotes the best hit of a missing control role into the
// top-K by overwriting the lowest-ranked redundant guidance slot. So an implementation question
// shows the relevant source ROLES (binding requirement + standard + guidance) side by side
// instead of one role flooding the list. The promoted hit's original (now duplicate) position
// stays in the tail and is dropped by the caller's truncation to topK.
func ensureControlDiversity(results []LegalSearchResult, topK int) []LegalSearchResult {
if topK <= 0 || topK >= len(results) {
return results // everything is already returned — nothing to promote
}
roleAt := make([]string, len(results))
for i := range results {
roleAt[i] = classifyRole(results[i])
}
present := make(map[string]bool, topK)
for i := 0; i < topK; i++ {
present[roleAt[i]] = true
}
for _, want := range []string{roleOperationalReq, roleControlStandard} {
if present[want] {
continue
}
src := -1
for i := topK; i < len(results); i++ {
if roleAt[i] == want {
src = i
break
}
}
if src < 0 {
continue // role absent from the whole pool — nothing to promote
}
dst := -1
for j := topK - 1; j >= 0; j-- {
if roleAt[j] == roleImplGuidance {
dst = j
break
}
}
if dst < 0 {
continue // no redundant guidance to sacrifice — leave the head untouched
}
results[dst] = results[src]
roleAt[dst] = want
present[want] = true
}
return results
}
@@ -77,3 +77,58 @@ func TestControlRoleOf_Payload(t *testing.T) {
t.Errorf("DORA abstract article role = %q must be excluded from the control-pool", got) t.Errorf("DORA abstract article role = %q must be excluded from the control-pool", got)
} }
} }
func headHasRole(head []LegalSearchResult, role string) bool {
for _, r := range head {
if classifyRole(r) == role {
return true
}
}
return false
}
func TestEnsureControlDiversity(t *testing.T) {
ig := func(n string) LegalSearchResult {
return LegalSearchResult{RegulationShort: "ENISA " + n + " Good Practices"}
}
opReq := LegalSearchResult{RegulationShort: "CRA", ArticleLabel: "CRA Anhang I", Category: "regulation"}
std := LegalSearchResult{RegulationShort: "NIST SP 800-53"}
t.Run("injects missing op_req + control_standard, guidance keeps Top-1", func(t *testing.T) {
out := ensureControlDiversity([]LegalSearchResult{ig("A"), ig("B"), ig("C"), std, opReq}, 3)
head := out[:3]
if classifyRole(head[0]) != roleImplGuidance {
t.Errorf("Top-1 should stay implementation_guidance, got %q", classifyRole(head[0]))
}
if !headHasRole(head, roleOperationalReq) {
t.Error("top-K must contain an operational_requirement after diversity")
}
if !headHasRole(head, roleControlStandard) {
t.Error("top-K must contain a control_standard after diversity")
}
})
t.Run("no-op when both roles already present", func(t *testing.T) {
out := ensureControlDiversity([]LegalSearchResult{opReq, std, ig("A"), ig("B")}, 3)
if classifyRole(out[0]) != roleOperationalReq || classifyRole(out[1]) != roleControlStandard {
t.Error("already-diverse top-K must be left untouched")
}
})
t.Run("absent role is not forced (no panic)", func(t *testing.T) {
out := ensureControlDiversity([]LegalSearchResult{ig("A"), ig("B"), ig("C"), std}, 3)
if !headHasRole(out[:3], roleControlStandard) {
t.Error("present control_standard should be injected")
}
if headHasRole(out[:3], roleOperationalReq) {
t.Error("operational_requirement absent from the pool must NOT appear")
}
})
t.Run("topK covering the whole pool is unchanged", func(t *testing.T) {
out := ensureControlDiversity([]LegalSearchResult{ig("A"), opReq}, 5)
if len(out) != 2 || classifyRole(out[0]) != roleImplGuidance {
t.Error("topK >= len must return results unchanged")
}
})
}
@@ -0,0 +1,117 @@
package ucca
import (
"bufio"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
)
// EvidenceRequirement is the last edge of the compliance graph: it says WHAT concrete
// evidence proves a framework control is met, and how fresh that evidence must be. This is
// what lets the Advisor eventually state "the CRA requirement is fulfilled" — not because a
// document exists, but because the required, current evidence is present. Authored/curated,
// not retriever-generated.
type EvidenceRequirement struct {
Framework string `json:"framework"` // e.g. "OWASP ASVS"
Control string `json:"control"` // e.g. "V6.3.1"
EvidenceType string `json:"evidence_type"` // sbom|test_report|config_export|repo_scan|policy|ticket|audit_log|pentest
EvidenceSource string `json:"evidence_source"` // github|ci|scanner|manual_upload
FreshnessRequirement string `json:"freshness_requirement"` // per_release|quarterly|annually|continuous
Required bool `json:"required"`
Rationale string `json:"rationale"`
Version string `json:"version"`
}
// Allowed enum values — the rule layer that keeps the evidence catalog clean.
var (
evidenceTypeValues = map[string]bool{"sbom": true, "test_report": true, "config_export": true, "repo_scan": true, "policy": true, "ticket": true, "audit_log": true, "pentest": true}
evidenceSourceValues = map[string]bool{"github": true, "ci": true, "scanner": true, "manual_upload": true}
freshnessValues = map[string]bool{"per_release": true, "quarterly": true, "annually": true, "continuous": true}
)
// Validate checks required fields + enum membership. Fail-closed at load.
func (e EvidenceRequirement) Validate() error {
switch {
case e.Framework == "":
return fmt.Errorf("evidence requirement: framework required")
case e.Control == "":
return fmt.Errorf("evidence requirement: control required")
case !evidenceTypeValues[e.EvidenceType]:
return fmt.Errorf("evidence requirement: invalid evidence_type %q", e.EvidenceType)
case !evidenceSourceValues[e.EvidenceSource]:
return fmt.Errorf("evidence requirement: invalid evidence_source %q", e.EvidenceSource)
case !freshnessValues[e.FreshnessRequirement]:
return fmt.Errorf("evidence requirement: invalid freshness_requirement %q", e.FreshnessRequirement)
}
return nil
}
// EvidenceRequirementSet is the loaded, indexed evidence catalog.
type EvidenceRequirementSet struct {
All []EvidenceRequirement
byControl map[string][]EvidenceRequirement
}
// For returns all evidence requirements declared for a framework control.
func (s *EvidenceRequirementSet) For(framework, control string) []EvidenceRequirement {
return s.byControl[controlKey(framework, control)]
}
// RequiredFor returns only the required evidence for a control — the minimum that must be
// present before the control may be treated as met.
func (s *EvidenceRequirementSet) RequiredFor(framework, control string) []EvidenceRequirement {
out := make([]EvidenceRequirement, 0)
for _, e := range s.byControl[controlKey(framework, control)] {
if e.Required {
out = append(out, e)
}
}
return out
}
// LoadEvidenceRequirements reads every *.jsonl file under dir (one requirement per line;
// blank and //-prefixed lines ignored), validates each, and builds the per-control index.
// An invalid row aborts the load — fail-closed.
func LoadEvidenceRequirements(dir string) (*EvidenceRequirementSet, error) {
files, err := filepath.Glob(filepath.Join(dir, "*.jsonl"))
if err != nil {
return nil, err
}
set := &EvidenceRequirementSet{byControl: map[string][]EvidenceRequirement{}}
for _, f := range files {
fh, err := os.Open(f)
if err != nil {
return nil, err
}
sc := bufio.NewScanner(fh)
sc.Buffer(make([]byte, 0, 64*1024), 1024*1024)
line := 0
for sc.Scan() {
line++
raw := strings.TrimSpace(sc.Text())
if raw == "" || strings.HasPrefix(raw, "//") {
continue
}
var e EvidenceRequirement
if err := json.Unmarshal([]byte(raw), &e); err != nil {
fh.Close()
return nil, fmt.Errorf("%s:%d: %w", f, line, err)
}
if err := e.Validate(); err != nil {
fh.Close()
return nil, fmt.Errorf("%s:%d: %w", f, line, err)
}
set.All = append(set.All, e)
k := controlKey(e.Framework, e.Control)
set.byControl[k] = append(set.byControl[k], e)
}
fh.Close()
if err := sc.Err(); err != nil {
return nil, err
}
}
return set, nil
}
@@ -0,0 +1,84 @@
package ucca
import (
"os"
"path/filepath"
"testing"
)
func TestEvidenceRequirement_Validate(t *testing.T) {
valid := EvidenceRequirement{Framework: "OWASP ASVS", Control: "V6.3.1", EvidenceType: "config_export", EvidenceSource: "github", FreshnessRequirement: "per_release", Required: true}
if err := valid.Validate(); err != nil {
t.Fatalf("valid rejected: %v", err)
}
bad := []struct {
name string
e EvidenceRequirement
}{
{"no control", EvidenceRequirement{Framework: "X", EvidenceType: "sbom", EvidenceSource: "ci", FreshnessRequirement: "per_release"}},
{"bad evidence_type", EvidenceRequirement{Framework: "X", Control: "Y", EvidenceType: "screenshot", EvidenceSource: "ci", FreshnessRequirement: "per_release"}},
{"bad evidence_source", EvidenceRequirement{Framework: "X", Control: "Y", EvidenceType: "sbom", EvidenceSource: "email", FreshnessRequirement: "per_release"}},
{"bad freshness", EvidenceRequirement{Framework: "X", Control: "Y", EvidenceType: "sbom", EvidenceSource: "ci", FreshnessRequirement: "weekly"}},
}
for _, tt := range bad {
if err := tt.e.Validate(); err == nil {
t.Errorf("%s: expected rejection", tt.name)
}
}
}
func TestLoadEvidenceRequirements(t *testing.T) {
dir := t.TempDir()
content := `// header
{"framework":"OWASP ASVS","control":"V6.3.1","evidence_type":"config_export","evidence_source":"github","freshness_requirement":"per_release","required":true,"version":"2026-06-25"}
{"framework":"OWASP ASVS","control":"V6.3.1","evidence_type":"pentest","evidence_source":"manual_upload","freshness_requirement":"annually","required":false,"version":"2026-06-25"}
`
if err := os.WriteFile(filepath.Join(dir, "e.jsonl"), []byte(content), 0o644); err != nil {
t.Fatal(err)
}
set, err := LoadEvidenceRequirements(dir)
if err != nil {
t.Fatalf("load: %v", err)
}
if len(set.All) != 2 {
t.Fatalf("want 2, got %d", len(set.All))
}
if got := set.For("OWASP ASVS", "V6.3.1"); len(got) != 2 {
t.Errorf("For: want 2, got %d", len(got))
}
if got := set.RequiredFor("OWASP ASVS", "V6.3.1"); len(got) != 1 {
t.Errorf("RequiredFor: want 1 (pentest is optional), got %d", len(got))
}
}
func TestEvidenceRequirements_SeedFileValid(t *testing.T) {
set, err := LoadEvidenceRequirements("../../data/evidence_requirements")
if err != nil {
t.Fatalf("seed evidence_requirements failed to load: %v", err)
}
if len(set.All) == 0 {
t.Fatal("seed evidence_requirements is empty")
}
}
// TestGraph_AcceptedControlsHaveEvidence wires the two layers: every control an accepted
// CRA->OWASP mapping points to must have >=1 required evidence — the Obligation -> Control ->
// Evidence chain must be connected, no dangling control nodes.
func TestGraph_AcceptedControlsHaveEvidence(t *testing.T) {
maps, err := LoadControlMappings("../../data/control_mappings")
if err != nil {
t.Fatal(err)
}
ev, err := LoadEvidenceRequirements("../../data/evidence_requirements")
if err != nil {
t.Fatal(err)
}
for _, m := range maps.All {
if !m.IsAccepted() {
continue
}
if len(ev.RequiredFor(m.TargetFramework, m.TargetControl)) == 0 {
t.Errorf("accepted control %s %s has no required evidence (dangling graph node)", m.TargetFramework, m.TargetControl)
}
}
}
@@ -166,6 +166,15 @@ func (c *LegalRAGClient) searchInternal(ctx context.Context, collection string,
// Response-Schema unveraendert. Score traegt den Authority-Score, damit nachgelagerte // Response-Schema unveraendert. Score traegt den Authority-Score, damit nachgelagerte
// Multi-Collection-Merges (Advisor) die Ordnung bewahren. // Multi-Collection-Merges (Advisor) die Ordnung bewahren.
results = rerankByAuthority(query, results) results = rerankByAuthority(query, results)
// Control-Diversity: auf einer Umsetzungsfrage darf impl_guidance (ENISA) Top-1 bleiben,
// aber die Top-K soll mindestens eine binding operational_requirement (CRA Anhang I) und
// einen control_standard (NIST/ISO) zeigen, falls im Pool — Quellenarten sichtbar machen
// statt sie kuenstlich auf Top-1 zu heben. Nur Reihenfolge, vor der Truncation.
if queryWantsControls(query) {
results = ensureControlDiversity(results, topK)
}
if topK > 0 && len(results) > topK { if topK > 0 && len(results) > topK {
results = results[:topK] results = results[:topK]
} }
@@ -0,0 +1,172 @@
package ucca
import (
"encoding/json"
"os"
"regexp"
"strings"
)
// ObligationKey is one entry of the Obligation Registry's cross-session contract
// (obligations/obligation_join_keys.json). obligation_id is the STABLE join key — assigned
// only by the Registry, never minted here. citation_units are the interim bridge until our
// ControlMapping adopts obligation_id directly.
type ObligationKey struct {
ObligationID string `json:"obligation_id"`
Regulation string `json:"regulation"`
Family string `json:"family"`
Tier string `json:"tier"`
CitationUnits []string `json:"citation_units"`
SourceRole string `json:"source_role"`
}
// ObligationJoinKeys is the loaded contract + a citation-unit index for the interim join.
type ObligationJoinKeys struct {
SchemaVersion string `json:"schema_version"`
Count int `json:"count"`
ObligationIDs []ObligationKey `json:"obligation_ids"`
byCitationKey map[string][]string
}
var citationRefRe = regexp.MustCompile(`\(([0-9a-zA-Z]+)\)`)
// citationUnitKey normalizes a CRA Annex I reference for the INTERIM citation_unit join, so
// our "CRA Annex I Part I (2)(c)" and the Registry's "Annex I (2)(c)" collapse to the same
// key ("i:2.c"). Interim only — superseded by the stable obligation_id once adopted.
func citationUnitKey(cu string) string {
low := strings.ToLower(cu)
part := ""
switch {
case strings.Contains(low, "part ii"):
part = "ii"
case strings.Contains(low, "part i"), strings.Contains(low, "(2)"):
part = "i" // CRA Annex I Part I = the (2)(x) essential requirements
}
var refs []string
for _, m := range citationRefRe.FindAllStringSubmatch(cu, -1) {
refs = append(refs, strings.ToLower(m[1]))
}
return part + ":" + strings.Join(refs, ".")
}
// LoadObligationJoinKeys reads the Registry contract and indexes it by citation-unit key.
func LoadObligationJoinKeys(path string) (*ObligationJoinKeys, error) {
raw, err := os.ReadFile(path)
if err != nil {
return nil, err
}
var o ObligationJoinKeys
if err := json.Unmarshal(raw, &o); err != nil {
return nil, err
}
o.byCitationKey = map[string][]string{}
for _, ob := range o.ObligationIDs {
for _, cu := range ob.CitationUnits {
k := citationUnitKey(cu)
o.byCitationKey[k] = append(o.byCitationKey[k], ob.ObligationID)
}
}
return &o, nil
}
// ObligationsForCitation returns the obligation_ids that join (interim) to a citation
// reference such as a control_mapping.source_norm.
func (o *ObligationJoinKeys) ObligationsForCitation(citationRef string) []string {
return o.byCitationKey[citationUnitKey(citationRef)]
}
// FindObligation returns the registry entry for an obligation_id (nil if unknown).
func (o *ObligationJoinKeys) FindObligation(obligationID string) *ObligationKey {
for i := range o.ObligationIDs {
if o.ObligationIDs[i].ObligationID == obligationID {
return &o.ObligationIDs[i]
}
}
return nil
}
// mappingReaches reports whether a control mapping reaches an obligation — EXACT via the
// adopted obligation_id (semantic, preferred), else via the interim citation_unit join (for
// not-yet-adopted rows). Once obligation_id is set, the coarse citation_unit match is ignored:
// that is how the semantic join replaces the structural one (e.g. V11.2.1 crypto no longer
// rides (2)(d) into user_authentication_required — it goes to credential_confidentiality_protection).
func mappingReaches(m ControlMapping, ob ObligationKey, citationKeys map[string]bool) bool {
if m.ObligationID != "" {
return m.ObligationID == ob.ObligationID
}
return citationKeys[citationUnitKey(m.SourceNorm)]
}
// AcceptedControlsForObligation returns our accepted control mappings that reach an obligation
// (deduped by target control), obligation_id-exact where adopted, citation_unit otherwise.
func AcceptedControlsForObligation(ob ObligationKey, mappings *ControlMappingSet) []ControlMapping {
keys := make(map[string]bool, len(ob.CitationUnits))
for _, cu := range ob.CitationUnits {
keys[citationUnitKey(cu)] = true
}
out := []ControlMapping{}
seen := map[string]bool{}
for _, m := range mappings.All {
if !m.IsAccepted() || !mappingReaches(m, ob, keys) {
continue
}
ck := m.TargetFramework + ":" + m.TargetControl
if seen[ck] {
continue
}
seen[ck] = true
out = append(out, m)
}
return out
}
// ObligationCoverage is one row of the cross-session coverage report.
type ObligationCoverage struct {
ObligationID string `json:"obligation_id"`
Family string `json:"family"`
Status string `json:"status"` // covered | mapped_rejected | uncovered
AcceptedControls []string `json:"accepted_controls"`
EvidenceCount int `json:"evidence_count"`
}
// ComputeObligationCoverage joins the Registry obligations to our control mappings — exact via
// obligation_id where adopted, else via the interim citation_unit join — and reports per
// obligation: covered (>=1 accepted control reaches it), mapped_rejected (only rejected
// mappings reach it), or uncovered. The signal back to the Obligation session.
func ComputeObligationCoverage(joins *ObligationJoinKeys, mappings *ControlMappingSet, evidence *EvidenceRequirementSet) []ObligationCoverage {
out := make([]ObligationCoverage, 0, len(joins.ObligationIDs))
for _, ob := range joins.ObligationIDs {
keys := make(map[string]bool, len(ob.CitationUnits))
for _, cu := range ob.CitationUnits {
keys[citationUnitKey(cu)] = true
}
cov := ObligationCoverage{ObligationID: ob.ObligationID, Family: ob.Family}
seen := map[string]bool{}
rejected := false
for _, m := range mappings.All {
if !mappingReaches(m, ob, keys) {
continue
}
if m.IsAccepted() {
ck := m.TargetFramework + ":" + m.TargetControl
if !seen[ck] {
seen[ck] = true
cov.AcceptedControls = append(cov.AcceptedControls, ck)
cov.EvidenceCount += len(evidence.RequiredFor(m.TargetFramework, m.TargetControl))
}
} else if m.MappingStatus == "rejected" {
rejected = true
}
}
switch {
case len(cov.AcceptedControls) > 0:
cov.Status = "covered"
case rejected:
cov.Status = "mapped_rejected"
default:
cov.Status = "uncovered"
}
out = append(out, cov)
}
return out
}
@@ -0,0 +1,61 @@
package ucca
import "testing"
func TestCitationUnitKey_Join(t *testing.T) {
// our source_norm and the registry citation_unit must collapse to the SAME key.
if citationUnitKey("CRA Annex I Part I (2)(c) — Schutz vor unbefugtem Zugriff") != citationUnitKey("Annex I (2)(c)") {
t.Errorf("interim join broken: %q vs %q",
citationUnitKey("CRA Annex I Part I (2)(c)"), citationUnitKey("Annex I (2)(c)"))
}
// Part II must NOT collide with Part I.
if citationUnitKey("Annex I Part II (1)") == citationUnitKey("CRA Annex I Part I (2)(c)") {
t.Error("Part II must not join to Part I")
}
}
func TestLoadObligationJoinKeys(t *testing.T) {
o, err := LoadObligationJoinKeys("../../../obligations/obligation_join_keys.json")
if err != nil {
t.Fatalf("load: %v", err)
}
if o.Count != len(o.ObligationIDs) {
t.Errorf("count %d != len %d", o.Count, len(o.ObligationIDs))
}
if len(o.ObligationIDs) == 0 {
t.Fatal("empty contract")
}
if got := o.ObligationsForCitation("CRA Annex I Part I (2)(c)"); len(got) == 0 {
t.Error("expected an obligation joined to (2)(c)")
}
}
func TestObligationCoverage_Report(t *testing.T) {
joins, err := LoadObligationJoinKeys("../../../obligations/obligation_join_keys.json")
if err != nil {
t.Fatalf("join keys: %v", err)
}
maps, err := LoadControlMappings("../../data/control_mappings")
if err != nil {
t.Fatalf("mappings: %v", err)
}
ev, err := LoadEvidenceRequirements("../../data/evidence_requirements")
if err != nil {
t.Fatalf("evidence: %v", err)
}
cov := ComputeObligationCoverage(joins, maps, ev)
if len(cov) == 0 {
t.Fatal("no coverage computed")
}
byStatus := map[string]int{}
for _, c := range cov {
byStatus[c.Status]++
}
t.Logf("COVERAGE: %d Obligations | covered=%d mapped_rejected=%d uncovered=%d",
len(cov), byStatus["covered"], byStatus["mapped_rejected"], byStatus["uncovered"])
for _, c := range cov {
if c.Status != "uncovered" {
t.Logf(" %-15s %-36s controls=%v evidence=%d", c.Status, c.ObligationID, c.AcceptedControls, c.EvidenceCount)
}
}
}
@@ -0,0 +1,179 @@
"""Obligation Aggregation Engine — Ausführung des Legal Obligation Layer v1.
Aggregiert Bewertungen auf KRITERIUM-Ebene (pro Control) zu Ergebnissen auf
OBLIGATION-Ebene. Das ist die erstmalige Ausführung des Modells
Regulation → Legal Obligation → Control → Criterion
— das Finding entsteht auf der OBLIGATION, nicht pro Control. Damit kollabiert
die im Katalog gemessene Redundanz (portability 11×, recipients 14×): N Controls,
die dieselbe Pflicht prüfen, ergeben EIN Obligation-Finding statt N Control-Findings.
Regulierungs-agnostisch: kennt nur obligation_id, tier, met, legal_basis,
conditional. DSGVO/CRA/NIS2/DORA/MaschVO/AI-Act speisen dieselbe Funktion.
Fail-safe (docs-src/development/legal_obligation_layer_v1.md, §Aggregation):
LEGAL_MINIMUM-Obligation:
applicable=false → NA (kein Finding)
keine LM-Anforderung erfüllt → FAILED (Pflicht-Lücke)
alle LM-Anforderungen erfüllt → MET
nur ein Teil erfüllt → PARTIAL
LM nicht bewertbar (Prüfer down) → UNDETERMINED (Aufrufer behält Legacy)
BEST_PRACTICE/OPTIONAL-Obligation (kein LM):
mind. ein Kriterium erfüllt → MET (abgedeckt)
keines → OPEN (nur Empfehlung, NIE FAILED)
Redundanz-Kollaps: LM-Kriterien EINER Obligation werden zu „Anforderungen" nach
`legal_basis` gruppiert; eine Anforderung gilt als erfüllt, sobald IRGENDEIN Control
sie bestätigt (OR). 9× recipients_disclosed (alle Art 13(1)(e)) = eine Anforderung.
PARTIAL entsteht nur bei mehreren DISTINKTEN LM-Anforderungen (verschiedene
legal_basis) innerhalb einer Obligation.
"""
from __future__ import annotations
from collections import Counter, defaultdict
from dataclasses import dataclass, field
from typing import Callable, Optional
LM, BP, OPT = "LEGAL_MINIMUM", "BEST_PRACTICE", "OPTIONAL"
MET, PARTIAL, FAILED = "MET", "PARTIAL", "FAILED"
NA, UNDETERMINED, OPEN = "NA", "UNDETERMINED", "OPEN"
PFLICHT, EMPFEHLUNG, NICHT_ANWENDBAR = "PFLICHT", "EMPFEHLUNG", "NICHT_ANWENDBAR"
# Predikat-Hook: (conditional, doc_text) → True (anwendbar) / False (→ NA) / None (unbekannt → anwendbar)
ApplicableFn = Callable[[str, str], Optional[bool]]
@dataclass(frozen=True)
class CriterionEval:
"""Eine Kriteriums-Bewertung eines Controls, einer Obligation zugeordnet."""
obligation_id: str
tier: str # LEGAL_MINIMUM / BEST_PRACTICE / OPTIONAL
met: Optional[bool] # True erfüllt · False fehlt · None unbestimmt
control_id: str
legal_basis: str = ""
criterion: str = ""
conditional: Optional[str] = None # Applicability-Prädikat der Obligation
@dataclass
class ObligationResult:
obligation_id: str
status: str # MET / PARTIAL / FAILED / NA / UNDETERMINED / OPEN
bucket: str # PFLICHT / EMPFEHLUNG / NICHT_ANWENDBAR
tier: str # bestimmende Tier der Obligation
applicable: bool
evidence: list[str] # beitragende control_ids
lm_met: int # erfüllte LM-Anforderungen
lm_total: int # distinkte LM-Anforderungen (bewertbar)
recommendations: list[dict] = field(default_factory=list)
def _governing_tier(evals: list[CriterionEval]) -> str:
tiers = {e.tier for e in evals}
if LM in tiers:
return LM
return BP if BP in tiers else OPT
def _requirement_state(evals: list[CriterionEval]) -> Optional[bool]:
"""Zustand EINER LM-Anforderung über alle prüfenden Controls (OR/Redundanz):
True (irgendwer bestätigt) · None (alle unbestimmt) · False (bewertet, fehlt)."""
if any(e.met is True for e in evals):
return True
if all(e.met is None for e in evals):
return None
return False
def _recommendations(evals: list[CriterionEval]) -> list[dict]:
"""Nicht erfüllte BEST_PRACTICE/OPTIONAL-Kriterien → Empfehlungen."""
return [{"criterion": e.criterion, "tier": e.tier, "legal_basis": e.legal_basis,
"control_id": e.control_id}
for e in evals if e.tier in (BP, OPT) and e.met is False]
def aggregate_obligation(obligation_id: str, evals: list[CriterionEval], *,
applicable_fn: Optional[ApplicableFn] = None,
doc_text: str = "") -> ObligationResult:
evidence = sorted({e.control_id for e in evals if e.control_id})
conditional = next((e.conditional for e in evals if e.conditional), None)
tier = _governing_tier(evals)
recs = _recommendations(evals)
applicable = True
if applicable_fn is not None and conditional:
verdict = applicable_fn(conditional, doc_text)
applicable = True if verdict is None else bool(verdict)
if not applicable:
return ObligationResult(obligation_id, NA, NICHT_ANWENDBAR, tier, False,
evidence, 0, 0, recs)
lm_evals = [e for e in evals if e.tier == LM]
if lm_evals:
reqs: dict[str, list[CriterionEval]] = defaultdict(list)
for e in lm_evals:
reqs[e.legal_basis or obligation_id].append(e)
states = [_requirement_state(v) for v in reqs.values()]
determinable = [s for s in states if s is not None]
if not determinable:
return ObligationResult(obligation_id, UNDETERMINED, PFLICHT, LM, True,
evidence, 0, len(states), recs)
met = sum(1 for s in determinable if s)
total = len(determinable)
status = MET if met == total else (FAILED if met == 0 else PARTIAL)
return ObligationResult(obligation_id, status, PFLICHT, LM, True,
evidence, met, total, recs)
# Reine BEST_PRACTICE/OPTIONAL-Obligation: nie Pflicht, nie FAILED.
covered = any(e.met is True for e in evals)
return ObligationResult(obligation_id, MET if covered else OPEN, EMPFEHLUNG,
tier, True, evidence, 0, 0, recs)
def aggregate_obligations(evals: list[CriterionEval], *,
applicable_fn: Optional[ApplicableFn] = None,
doc_text: str = "") -> list[ObligationResult]:
"""Flache Kriteriums-Liste → ein ObligationResult je obligation_id."""
groups: dict[str, list[CriterionEval]] = defaultdict(list)
for e in evals:
if e.obligation_id:
groups[e.obligation_id].append(e)
return [aggregate_obligation(oid, g, applicable_fn=applicable_fn, doc_text=doc_text)
for oid, g in groups.items()]
def evals_from_tiered(control_id: str, tiered_criteria: list[dict],
detail: list[dict], conditional: Optional[str] = None
) -> list[CriterionEval]:
"""Adapter: tiered_criteria (obligation_id/tier/legal_basis) + das
evaluate_tiered-`detail` (met pro Index, gleiche Reihenfolge) → CriterionEvals.
`conditional` kommt aus der Control-`applicability` (gilt für die Obligation)."""
out: list[CriterionEval] = []
for i, c in enumerate(tiered_criteria or []):
oid = c.get("obligation_id")
if not oid:
continue
d = detail[i] if i < len(detail) else {}
out.append(CriterionEval(
obligation_id=oid,
tier=(c.get("compliance_tier") or "").upper(),
met=d.get("met"),
control_id=control_id,
legal_basis=c.get("legal_basis") or "",
criterion=c.get("criterion") or "",
conditional=conditional,
))
return out
def summarize(results: list[ObligationResult]) -> dict:
"""Phase-C-Kennzahlen: Obligation-Anzahl + Verteilung nach Bucket/Status."""
return {
"obligations": len(results),
"buckets": dict(Counter(r.bucket for r in results)),
"statuses": dict(Counter(r.status for r in results)),
"pflicht_failed": sum(1 for r in results if r.bucket == PFLICHT and r.status == FAILED),
"pflicht_partial": sum(1 for r in results if r.bucket == PFLICHT and r.status == PARTIAL),
"recommendations": sum(len(r.recommendations) for r in results),
}
@@ -0,0 +1,76 @@
"""Applicability-Prädikate (minimal) für die Obligation Aggregation Engine.
Jedes Prädikat entscheidet aus dem Dokumenttext, ob eine BEDINGTE Obligation
anwendbar ist:
True → anwendbar (normal bewerten)
False → NICHT anwendbar (→ NA statt FEHLT)
None → Prädikat unbekannt → Aufrufer behält Default=anwendbar (fail-safe,
KEINE stille NA)
Bewusst KLEIN gehalten: nur die bereits modellierten Bedingungen
has_third_country_transfer · uses_legitimate_interest · direct_marketing
(+ legitimate_interest_or_public_task, weil objection_general_art21_1 dieselbe
Rechtsgrundlage als Anknüpfung nutzt). profiling/employment/telecom/health/
data_act folgen in der nächsten Charge — bis dahin → None → anwendbar.
"""
from __future__ import annotations
from typing import Optional
_THIRD_COUNTRY = (
"drittland", "drittstaat", "drittländ", "third countr", "außerhalb der eu",
"ausserhalb der eu", "außerhalb des ewr", "ausserhalb des ewr",
"angemessenheitsbeschluss", "standardvertragsklausel", "standarddatenschutzklausel",
"binding corporate rules", "verbindliche interne datenschutzvorschriften",
"data privacy framework", "privacy shield", "in die usa", "in den usa",
"vereinigte staaten", "international transfer", "internationale übermittlung",
"art. 44", "art. 46",
)
_LEGIT = (
"berechtigtes interesse", "berechtigten interesse", "berechtigte interesse",
"legitimate interest", "art. 6 abs. 1 lit. f", "art. 6 abs. 1 f",
"art. 6 (1) (f)", "abs. 1 buchstabe f", "interessenabwägung",
)
_PUBLIC_TASK = (
"öffentliche aufgabe", "öffentlichen aufgabe", "im öffentlichen interesse",
"art. 6 abs. 1 lit. e", "ausübung öffentlicher gewalt", "official authority",
)
_DIRECT_MKT = (
"direktwerbung", "direktmarketing", "direkt-werbung", "werbe-e-mail", "werbe-mail",
"newsletter", "werbliche", "marketingzweck", "marketing-zweck", "zwecke der werbung",
"zu werbezwecken", "e-mail-marketing", "postwerbung", "telefonwerbung",
)
def _has(text: str, kws: tuple[str, ...]) -> bool:
return any(k in text for k in kws)
def has_third_country_transfer(text: str) -> bool:
return _has(text, _THIRD_COUNTRY)
def uses_legitimate_interest(text: str) -> bool:
return _has(text, _LEGIT)
def direct_marketing(text: str) -> bool:
return _has(text, _DIRECT_MKT)
_PREDICATES = {
"has_third_country_transfer": has_third_country_transfer,
"uses_legitimate_interest": uses_legitimate_interest,
"legitimate_interest_or_public_task":
lambda t: _has(t, _LEGIT) or _has(t, _PUBLIC_TASK),
"direct_marketing": direct_marketing,
}
def applicable(conditional: str, doc_text: str) -> Optional[bool]:
"""applicable_fn-Hook für `aggregate_obligations`. Unbekanntes Prädikat → None
(Aufrufer behält Default=anwendbar; NIE stille NA)."""
fn = _PREDICATES.get(conditional)
if fn is None:
return None
return fn((doc_text or "").lower())
@@ -0,0 +1,26 @@
"""Obligation-Taxonomie-Registry — versioniertes Artefakt bis zur DB-Owner-Tabelle
(Legal Obligation Layer v1, docs-src/development/legal_obligation_layer_v1.md).
Hält Metadaten auf OBLIGATION-Ebene, die (noch) keine eigene DB-Tabelle haben.
`decision_method_required`: Obligations, deren Erkennung Keyword/Embedding
NACHWEISLICH nicht zuverlässig leistet (kompakte/synonymreiche Offenlegung) und
die CONTENT/LLM brauchen. Empirisch belegt am TeamViewer-Recall-Defekt: 0/22
recipients+international_transfer Controls trafen, obwohl die Pflicht erfüllt war
(„…außerhalb EU/EWR … Standardvertragsklauseln/Schutzmaßnahmen"); Embedding cos
0.490.57 < 0.62, teils falscher Chunk → kein Schwellen-Fix, sondern LLM-Klasse.
Wirkung: der Shadow zählt ein FAILED solcher Obligations NICHT als „echte Lücke",
sondern als RECALL_LIMITED (Prüfer kann sie mit aktueller Methode nicht verifizieren).
"""
OBLIGATION_META: dict[str, dict] = {
"recipients_disclosed": {"decision_method_required": "LLM"},
"third_country_transfer_disclosed": {"decision_method_required": "LLM"},
"safeguards_disclosed": {"decision_method_required": "LLM"},
"safeguards_accessible": {"decision_method_required": "LLM"},
}
def requires_llm(obligation_id: str) -> bool:
"""True, wenn diese Obligation CONTENT/LLM braucht (Keyword/Embedding-Recall belegt unzureichend)."""
return OBLIGATION_META.get(obligation_id, {}).get("decision_method_required") == "LLM"
@@ -0,0 +1,130 @@
"""DSE Shadow-Verdrahtung der Obligation Aggregation Engine.
Erzeugt aus den v3-`results` zusätzlich Obligation-Ergebnisse — AUSSCHLIESSLICH
für die Telemetrie (Shadow Mode). Ändert KEINE nutzer-sichtbaren Findings.
Mapping control-level über generation_metadata.legal_obligations +
applicability.conditional; das `met`-Signal ist das Legacy-`passed` des Controls
(kein zusätzlicher Prüfer-Call, kein Key). Liefert die Vergleichszahlen, mit denen
sich der Umschalt-Entscheid später absichern lässt:
legacy_control_findings · obligation_shadow_results · collapse_factor ·
na_count · met_failed_delta · top_collapsed_obligations
"""
from __future__ import annotations
import logging
import os
from typing import Any, Optional
logger = logging.getLogger(__name__)
async def fetch_obligation_markers(cids: list[str], db_url: str = "") -> dict[str, dict]:
"""legal_obligations + applicability.conditional der Controls laden.
Leeres Dict bei Fehler/keiner DB (Shadow fällt still aus)."""
cids = [c for c in cids if c]
if not cids:
return {}
import json
dsn = db_url or os.getenv("DATABASE_URL") or os.getenv("COMPLIANCE_DATABASE_URL")
if not dsn:
return {}
try:
import asyncpg
conn = await asyncpg.connect(dsn)
rows = await conn.fetch(
"select control_id, generation_metadata->'legal_obligations' obl, "
"generation_metadata->'applicability'->>'conditional' cond "
"from compliance.canonical_controls "
"where control_id = any($1::text[]) "
"and generation_metadata ? 'legal_obligations'", cids)
await conn.close()
except Exception as e:
logger.warning("fetch_obligation_markers failed: %s", e)
return {}
out: dict[str, dict] = {}
for r in rows:
obl = r["obl"]
obl = json.loads(obl) if isinstance(obl, str) else obl
if obl:
out[r["control_id"]] = {"obl": obl, "cond": r["cond"]}
return out
def compute_obligation_shadow(results: list[dict], text: str,
markers: dict[str, dict]) -> dict[str, Any]:
"""Reiner Shadow-Vergleich (keine DB, keine Seiteneffekte). `markers`:
{control_id: {obl:[...], cond:str|None}}. `met` = Legacy-`passed`."""
from compliance.services.obligation_aggregation import (
FAILED, LM, MET, NA, PARTIAL, CriterionEval, aggregate_obligations,
)
from compliance.services.obligation_applicability import applicable
from compliance.services.obligation_taxonomy import requires_llm
legacy = 0
evals: list[Any] = []
contrib: dict[str, list] = {}
for r in results:
cid = r.get("control_id")
m = markers.get(cid)
if not m:
continue
passed = bool(r.get("passed"))
if not passed:
legacy += 1
for ob in m["obl"]:
evals.append(CriterionEval(ob, LM, passed, cid, "", "", m.get("cond")))
contrib.setdefault(ob, []).append((cid, passed))
if not evals:
return {"status": "no obligation markers on result controls"}
obls = aggregate_obligations(evals, applicable_fn=applicable, doc_text=text)
# FAILED/PARTIAL ehrlich trennen: echte Lücke (failed_by_current_checker) vs
# RECALL_LIMITED (Obligation braucht LLM, aktueller Prüfer kann sie nicht verifizieren).
findings = failed_current = recall_limited = na = 0
for o in obls:
if o.status == NA:
na += 1
elif o.status in (FAILED, PARTIAL):
findings += 1
if requires_llm(o.obligation_id):
recall_limited += 1
else:
failed_current += 1
top = []
for o in obls:
cs = contrib.get(o.obligation_id, [])
fehlt = sum(1 for _, p in cs if not p)
if fehlt >= 2:
top.append({"obligation": o.obligation_id, "fehlt": fehlt,
"total": len(cs), "status": o.status,
"recall_limited": bool(requires_llm(o.obligation_id)
and o.status in (FAILED, PARTIAL))})
top.sort(key=lambda x: -x["fehlt"])
met_count = sum(1 for o in obls if o.status == MET)
recall_limited_obls = sorted({o.obligation_id for o in obls
if o.status in (FAILED, PARTIAL)
and requires_llm(o.obligation_id)})
return {
"legacy_control_findings": legacy,
"obligation_shadow_results": len(obls),
"obligation_findings": findings,
"failed_by_current_checker": failed_current,
"recall_limited": recall_limited,
"met_count": met_count,
"collapse_factor": round(legacy / findings, 2) if findings else None,
"na_count": na,
"met_failed_delta": legacy - findings,
"top_collapsed_obligations": top[:10],
"recall_limited_obligations": recall_limited_obls,
}
async def build_obligation_shadow(results: list[dict], text: str,
db_url: str = "") -> dict[str, Any]:
"""Async-Wrapper: Marker laden, dann Shadow rechnen. NIE in `results` schreiben."""
cids = [r.get("control_id") for r in results if r.get("control_id")]
markers = await fetch_obligation_markers(cids, db_url)
if not markers:
return {"status": "no markers"}
return compute_obligation_shadow(results, text, markers)
@@ -158,6 +158,17 @@ async def run_v3_pipeline(
except Exception as e: except Exception as e:
logger.warning("dse tiered eval skipped: %s", e) logger.warning("dse tiered eval skipped: %s", e)
# Layer 4 (SHADOW): Obligation-Aggregation NUR in die Telemetrie. Greift NICHT
# in `results` ein — nutzer-sichtbare Findings bleiben unverändert. Liefert die
# Vergleichszahlen für den späteren Umschalt-Entscheid (collapse_factor etc.).
obligation_shadow: dict[str, Any] = {}
try:
from ._obligation_shadow import build_obligation_shadow
obligation_shadow = await build_obligation_shadow(results, text, db_url)
except Exception as e:
logger.warning("dse obligation shadow skipped: %s", e)
obligation_shadow = {"error": str(e)}
telemetry = { telemetry = {
"layer_0_field_hits": len(boost_field_ids), "layer_0_field_hits": len(boost_field_ids),
"layer_0_field_ids": boost_field_ids, "layer_0_field_ids": boost_field_ids,
@@ -169,6 +180,7 @@ async def run_v3_pipeline(
"offtopic_dropped": drop_stats.get("offtopic_dropped", 0), "offtopic_dropped": drop_stats.get("offtopic_dropped", 0),
"gate_excluded": len(organizational), "gate_excluded": len(organizational),
"organizational_checklist": organizational, "organizational_checklist": organizational,
"obligation_shadow": obligation_shadow,
} }
logger.info("dse v3 telemetry: %s", telemetry) logger.info("dse v3 telemetry: %s", telemetry)
return results, telemetry return results, telemetry
@@ -0,0 +1,153 @@
"""Unit-Tests Obligation Aggregation Engine (Legal Obligation Layer v1).
Deckt die fail-safe Regeln + den Redundanz-Kollaps ab (echte DSE-Szenarien:
recipients 9×, objection LM+BP, portability OPTIONAL-Format)."""
from compliance.services.obligation_aggregation import (
BP, LM, OPT, CriterionEval, aggregate_obligation, aggregate_obligations,
evals_from_tiered, summarize,
)
def _ce(oid, tier, met, cid, basis="", crit="", cond=None):
return CriterionEval(oid, tier, met, cid, basis, crit, cond)
class TestRedundancyCollapse:
def test_nine_controls_one_confirms_collapses_to_one_met(self):
# recipients_disclosed: 9 Controls, gleiche Anforderung (Art 13(1)(e))
evals = [_ce("recipients_disclosed", LM, i == 4, f"DATA-{i}", "Art. 13(1)(e)")
for i in range(9)]
res = aggregate_obligation("recipients_disclosed", evals)
assert res.status == "MET"
assert res.lm_met == 1 and res.lm_total == 1 # 9 → 1 Anforderung
assert len(res.evidence) == 9
def test_all_nine_absent_fails_once(self):
evals = [_ce("recipients_disclosed", LM, False, f"DATA-{i}", "Art. 13(1)(e)")
for i in range(9)]
res = aggregate_obligation("recipients_disclosed", evals)
assert res.status == "FAILED"
assert res.bucket == "PFLICHT"
class TestPartialMultiFacet:
def test_two_distinct_lm_requirements_one_met_is_partial(self):
evals = [
_ce("transfer", LM, True, "C1", "Art. 13(1)(f)"), # erfüllt
_ce("transfer", LM, False, "C2", "Art. 46"), # fehlt → distinkt
]
res = aggregate_obligation("transfer", evals)
assert res.status == "PARTIAL"
assert res.lm_met == 1 and res.lm_total == 2
def test_both_distinct_requirements_met(self):
evals = [
_ce("transfer", LM, True, "C1", "Art. 13(1)(f)"),
_ce("transfer", LM, True, "C2", "Art. 46"),
]
assert aggregate_obligation("transfer", evals).status == "MET"
class TestApplicability:
def test_conditional_false_is_na(self):
evals = [_ce("transfer", LM, False, "C1", "Art. 44", cond="has_third_country_transfer")]
res = aggregate_obligation("transfer", evals, applicable_fn=lambda c, t: False)
assert res.status == "NA"
assert res.bucket == "NICHT_ANWENDBAR"
assert res.applicable is False
def test_conditional_true_evaluates_normally(self):
evals = [_ce("transfer", LM, False, "C1", "Art. 44", cond="has_third_country_transfer")]
res = aggregate_obligation("transfer", evals, applicable_fn=lambda c, t: True)
assert res.status == "FAILED"
def test_conditional_unknown_defaults_applicable(self):
evals = [_ce("transfer", LM, True, "C1", "Art. 44", cond="x")]
res = aggregate_obligation("transfer", evals, applicable_fn=lambda c, t: None)
assert res.applicable is True and res.status == "MET"
def test_no_predicate_means_applicable(self):
evals = [_ce("transfer", LM, True, "C1", cond="x")]
assert aggregate_obligation("transfer", evals).applicable is True
class TestUndetermined:
def test_all_lm_none_is_undetermined(self):
evals = [_ce("ob", LM, None, "C1", "b"), _ce("ob", LM, None, "C2", "b")]
res = aggregate_obligation("ob", evals)
assert res.status == "UNDETERMINED"
assert res.bucket == "PFLICHT"
def test_one_determinable_requirement_decides(self):
# eine Anforderung unbestimmt, die andere klar erfüllt → MET über die bewertbare
evals = [_ce("ob", LM, None, "C1", "b1"), _ce("ob", LM, True, "C2", "b2")]
res = aggregate_obligation("ob", evals)
assert res.status == "MET"
assert res.lm_total == 1 # nur die bewertbare Anforderung zählt
class TestBestPracticeOnly:
def test_pure_bp_covered_is_met_recommendation_bucket(self):
evals = [_ce("art20_format", OPT, True, "C1")]
res = aggregate_obligation("art20_format", evals)
assert res.status == "MET"
assert res.bucket == "EMPFEHLUNG"
def test_pure_bp_not_covered_is_open_never_failed(self):
evals = [_ce("art20_format", OPT, False, "C1", crit="JSON/CSV")]
res = aggregate_obligation("art20_format", evals)
assert res.status == "OPEN"
assert res.bucket == "EMPFEHLUNG"
assert len(res.recommendations) == 1
class TestRecommendationsWithinLm:
def test_unmet_bp_in_lm_obligation_becomes_recommendation(self):
# objection_direct_marketing: LM erfüllt + 3 BP teils offen
evals = [
_ce("obj_dm", LM, True, "SEC-8410", "Art. 21(2)", "Recht"),
_ce("obj_dm", BP, False, "SEC-8410", "", "Kontaktweg"),
_ce("obj_dm", BP, True, "SEC-8410", "", "kostenlos"),
]
res = aggregate_obligation("obj_dm", evals)
assert res.status == "MET" and res.bucket == "PFLICHT"
assert len(res.recommendations) == 1
assert res.recommendations[0]["criterion"] == "Kontaktweg"
class TestAdapterAndSummary:
def test_evals_from_tiered_zips_and_skips_no_obligation(self):
tc = [
{"criterion": "Recht", "compliance_tier": "LEGAL_MINIMUM",
"legal_basis": "Art. 21(1)", "obligation_id": "obj_gen"},
{"criterion": "Weg", "compliance_tier": "BEST_PRACTICE",
"legal_basis": "", "obligation_id": "obj_gen"},
{"criterion": "ohne", "compliance_tier": "OPTIONAL"}, # kein obligation_id → skip
]
detail = [{"met": True}, {"met": False}, {"met": True}]
evals = evals_from_tiered("AUTH-2051", tc, detail, conditional="x")
assert len(evals) == 2
assert evals[0].met is True and evals[0].conditional == "x"
assert evals[1].tier == BP and evals[1].met is False
def test_aggregate_obligations_groups_by_id(self):
evals = [
_ce("a", LM, True, "C1", "b"),
_ce("a", LM, True, "C2", "b"),
_ce("b", LM, False, "C3", "b"),
]
results = {r.obligation_id: r for r in aggregate_obligations(evals)}
assert set(results) == {"a", "b"}
assert results["a"].status == "MET"
assert results["b"].status == "FAILED"
def test_summarize_counts_buckets_and_failures(self):
evals = [
_ce("a", LM, False, "C1", "b"), # FAILED Pflicht
_ce("c", OPT, False, "C3", crit="x"), # OPEN Empfehlung
]
s = summarize(aggregate_obligations(evals))
assert s["obligations"] == 2
assert s["pflicht_failed"] == 1
assert s["buckets"]["PFLICHT"] == 1
assert s["buckets"]["EMPFEHLUNG"] == 1
@@ -0,0 +1,57 @@
"""Unit-Tests für die minimalen Applicability-Prädikate."""
from compliance.services.obligation_applicability import (
applicable, direct_marketing, has_third_country_transfer,
uses_legitimate_interest,
)
class TestThirdCountry:
def test_drittland_present(self):
assert has_third_country_transfer("übermittlung in ein drittland erfolgt") is True
def test_scc_present(self):
assert has_third_country_transfer("auf basis der standardvertragsklauseln") is True
def test_absent(self):
assert has_third_country_transfer("verarbeitung nur innerhalb deutschlands") is False
class TestLegitimateInterest:
def test_present(self):
assert uses_legitimate_interest("auf grundlage unseres berechtigten interesses") is True
def test_absent(self):
assert uses_legitimate_interest("nur auf grundlage ihrer einwilligung") is False
class TestDirectMarketing:
def test_newsletter(self):
assert direct_marketing("anmeldung zum newsletter möglich") is True
def test_direktwerbung(self):
assert direct_marketing("daten für direktwerbung genutzt") is True
def test_absent(self):
assert direct_marketing("wir versenden keine werblichen inhalte ohne basis") is True # 'werbliche' trifft
def test_truly_absent(self):
assert direct_marketing("reine vertragsabwicklung") is False
class TestApplicableHook:
def test_known_predicate_true(self):
assert applicable("has_third_country_transfer", "Transfer in die USA") is True
def test_known_predicate_false_triggers_na(self):
assert applicable("has_third_country_transfer", "nur in der EU") is False
def test_public_task_alias(self):
assert applicable("legitimate_interest_or_public_task",
"zur ausübung öffentlicher gewalt") is True
def test_unknown_predicate_returns_none(self):
# profiling noch nicht modelliert → None → Aufrufer behält anwendbar
assert applicable("profiling", "irgendein text") is None
def test_case_insensitive(self):
assert applicable("uses_legitimate_interest", "BERECHTIGTES INTERESSE") is True
@@ -0,0 +1,92 @@
"""Unit-Tests für die reinen Helfer der Obligation Discovery Pipeline (scripts/obligation_discovery/_core.py)."""
import pathlib
import sys
sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[2] / "scripts" / "obligation_discovery"))
from _core import ( # noqa: E402
centroid, cosine, greedy_cluster, merge_edges, parse_req, validate_registry,
)
class TestParseReq:
def test_list_passthrough(self):
assert parse_req(["a", "b"]) == ["a", "b"]
def test_python_repr_string(self):
assert parse_req("['x', 'y']") == ["x", "y"]
def test_json_string(self):
assert parse_req('["x", "y"]') == ["x", "y"]
def test_plain_string(self):
assert parse_req("just text") == ["just text"]
class TestCosine:
def test_identical(self):
assert cosine([1.0, 2.0, 3.0], [1.0, 2.0, 3.0]) > 0.999
def test_orthogonal(self):
assert abs(cosine([1.0, 0.0], [0.0, 1.0])) < 1e-9
def test_empty(self):
assert cosine([], [1.0]) == 0.0
class TestGreedyCluster:
def test_near_vectors_cluster_far_separate(self):
vecs = [[1.0, 0.0], [0.99, 0.01], [0.0, 1.0]]
clusters = greedy_cluster(vecs, 0.9)
assert len(clusters) == 2
assert clusters[0]["members"] == [0, 1]
assert clusters[1]["members"] == [2]
def test_deterministic(self):
vecs = [[1.0, 0.0], [0.5, 0.5], [0.99, 0.0]]
assert greedy_cluster(vecs, 0.8) == greedy_cluster(vecs, 0.8)
def test_none_vector_isolated(self):
clusters = greedy_cluster([[1.0, 0.0], None], 0.5)
assert clusters[1]["members"] == [1] and clusters[1]["seed"] is None
class TestCentroid:
def test_mean(self):
assert centroid([0, 1], [[0.0, 2.0], [2.0, 4.0]]) == [1.0, 3.0]
class TestValidateRegistry:
def _reg(self, obls, rels=None):
return {"obligations": obls, "relationships": rels or []}
def test_lm_without_legal_basis_fails(self):
r = self._reg([{"id": "x", "tier": "LEGAL_MINIMUM", "legal_basis": [], "member_controls": ["C1"]}])
v = validate_registry(r)
assert v["lm_without_legal_basis"] == ["x"] and v["passed"] is False
def test_clean_passes(self):
r = self._reg([{"id": "x", "tier": "LEGAL_MINIMUM", "legal_basis": [{"source": "CRA"}],
"member_controls": ["C1"], "provenance": {"source_meta_cluster": "M0"}}])
assert validate_registry(r)["passed"] is True
def test_over8_per_review_unit_flagged(self):
obls = [{"id": f"o{i}", "tier": "BEST_PRACTICE", "member_controls": ["C"],
"provenance": {"source_meta_cluster": "M0"}} for i in range(9)]
v = validate_registry(self._reg(obls))
assert v["over8_per_review_unit"] == {"M0": 9} and v["passed"] is False
def test_empty_member_controls_flagged(self):
v = validate_registry(self._reg([{"id": "x", "tier": "BEST_PRACTICE", "member_controls": []}]))
assert v["empty_member_controls"] == ["x"] and v["passed"] is False
class TestMergeEdges:
def test_dedup_and_semantic_only(self):
existing = [{"type": "supports", "from": "a", "to": "b"}]
proposed = [{"type": "supports", "from": "a", "to": "b"}, # dup
{"type": "depends_on", "from": "c", "to": "d"}, # new
{"type": "out_of_scope", "clusters": [1]}] # not semantic
merged, added = merge_edges(existing, proposed)
assert added == 1
assert {"type": "depends_on", "from": "c", "to": "d"} in merged
@@ -0,0 +1,74 @@
"""Unit-Tests für die DSE Shadow-Verdrahtung (compute_obligation_shadow, pure)."""
from compliance.services.specialist_agents.dse._obligation_shadow import (
compute_obligation_shadow,
)
NON_LLM = "art20_right_exists_core" # nicht in der LLM_REQUIRED-Registry
LLM_REQ = "third_country_transfer_disclosed" # in der LLM_REQUIRED-Registry
def _markers(n, ob, cond=None):
return {f"C{i}": {"obl": [ob], "cond": cond} for i in range(n)}
class TestComputeShadow:
def test_collapse_and_delta(self):
results = [{"control_id": f"C{i}", "passed": False} for i in range(5)]
s = compute_obligation_shadow(results, "x", _markers(5, NON_LLM))
assert s["legacy_control_findings"] == 5
assert s["obligation_findings"] == 1 # 5 → 1
assert s["failed_by_current_checker"] == 1
assert s["recall_limited"] == 0
assert s["collapse_factor"] == 5.0
assert s["met_failed_delta"] == 4
assert s["met_count"] == 0
top = s["top_collapsed_obligations"][0]
assert top["obligation"] == NON_LLM and top["fehlt"] == 5
assert top["recall_limited"] is False
def test_fp_correction_one_passed_collapses_to_met(self):
results = [{"control_id": f"C{i}", "passed": i == 0} for i in range(5)]
s = compute_obligation_shadow(results, "x", _markers(5, NON_LLM))
assert s["legacy_control_findings"] == 4
assert s["obligation_findings"] == 0 # MET (anderswo erfüllt)
assert s["met_failed_delta"] == 4
def test_na_when_predicate_false(self):
results = [{"control_id": "C0", "passed": False}]
m = {"C0": {"obl": [LLM_REQ], "cond": "has_third_country_transfer"}}
s = compute_obligation_shadow(results, "nur innerhalb der eu", m)
assert s["na_count"] == 1
assert s["obligation_findings"] == 0 # NA statt FEHLT
def test_no_markers_returns_status(self):
s = compute_obligation_shadow([{"control_id": "C0", "passed": False}], "x", {})
assert "no obligation" in s["status"]
def test_does_not_mutate_results(self):
results = [{"control_id": "C0", "passed": False}]
compute_obligation_shadow(results, "x", _markers(1, NON_LLM))
assert set(results[0].keys()) == {"control_id", "passed"}
class TestRecallSegregation:
def test_llm_required_failed_is_recall_limited_not_real_gap(self):
# 5 verfehlte third_country-Controls, Transfer-Text vorhanden → FAILED,
# aber LLM_REQUIRED → RECALL_LIMITED, NICHT failed_by_current_checker.
results = [{"control_id": f"C{i}", "passed": False} for i in range(5)]
m = {f"C{i}": {"obl": [LLM_REQ], "cond": "has_third_country_transfer"}
for i in range(5)}
s = compute_obligation_shadow(results, "übermittlung in ein drittland", m)
assert s["obligation_findings"] == 1
assert s["recall_limited"] == 1
assert s["failed_by_current_checker"] == 0
assert s["recall_limited_obligations"] == [LLM_REQ]
assert s["top_collapsed_obligations"][0]["recall_limited"] is True
def test_mixed_real_gap_and_recall_limited(self):
results = [{"control_id": "A", "passed": False}, {"control_id": "B", "passed": False}]
m = {"A": {"obl": [NON_LLM], "cond": None},
"B": {"obl": [LLM_REQ], "cond": "has_third_country_transfer"}}
s = compute_obligation_shadow(results, "übermittlung in ein drittland", m)
assert s["obligation_findings"] == 2
assert s["failed_by_current_checker"] == 1
assert s["recall_limited"] == 1
@@ -0,0 +1,20 @@
"""Unit-Tests für die Obligation-Taxonomie-Registry (decision_method_required)."""
from compliance.services.obligation_taxonomy import OBLIGATION_META, requires_llm
class TestRequiresLlm:
def test_marked_obligations_require_llm(self):
for ob in ("recipients_disclosed", "third_country_transfer_disclosed",
"safeguards_disclosed", "safeguards_accessible"):
assert requires_llm(ob) is True
def test_unmarked_obligation_does_not(self):
assert requires_llm("art20_right_exists_core") is False
assert requires_llm("objection_general_art21_1") is False
def test_unknown_obligation_is_false(self):
assert requires_llm("does_not_exist") is False
def test_registry_values_are_llm(self):
assert all(v.get("decision_method_required") == "LLM"
for v in OBLIGATION_META.values())
+41
View File
@@ -0,0 +1,41 @@
# 01 — Retrieval-Pipeline
**Zweck:** Einen Kandidaten-Pool bauen, der die *richtigen* Quellen enthält (Pflichtquelle **und** Controls) — auch dann, wenn reine Semantik sie verfehlen würde. Re-Ranking (02) kann nur ordnen, was im Pool liegt; deshalb ist der Pool-Aufbau die erste Verteidigungslinie gegen Recall-Lücken.
## Mechanik
`searchInternal()` (`legal_rag_client.go`) orchestriert den Pool in fester Reihenfolge — jede Stufe **augmentiert** (ersetzt nie), Fehler degradieren still:
1. **Embedding**`bge-m3` (1024-dim) über Ollama, Query auf 2000 Zeichen gekappt.
2. **Hybrid (RRF)**`searchHybrid()`: dense + Volltext via Qdrant Query-API, RRF-Fusion. Fällt bei Fehler auf `searchDense()` (reine Vektorsuche) zurück.
3. **Binding-Augmentation**`searchBinding()`: zieht die Top-`source_class=binding_law`-Treffer dazu, **damit die Pflichtquelle immer Kandidat ist**, auch wenn Guidance semantisch dominiert.
4. **Control-Augmentation**`searchControls()`: nur bei Control-Intent (siehe [05](05-control-intent.md)); tiefer dense-Pull, gefiltert auf Control-Pool-Rollen.
5. **Graph-Augmentation**`expandViaGraph()`: **opt-in**; zieht verbundene Normen über Zitations-Kanten.
6. **Merge**`mergeDedupHits()`: konkateniert, behält die erste Vorkommnis je Punkt-ID, Reihenfolge erhalten.
Danach: Map auf `LegalSearchResult` → Authority-Rerank (02) → Control-Diversity (05) → Truncate auf `topK`.
## Konstanten + Warum
| Konstante | Wert | Warum |
|-----------|------|-------|
| `prefetchLimit` (hybrid) | `20`, bzw. `topK*4` bei topK>20 | Fusion-Fenster: genug dense-Kontext für RRF, ohne den Volltext-Anteil zu verwässern |
| `controlPoolDepth` | `60` | **Gemessen:** für EU-Cyber-Control-Queries liegen die relevanten Control-Quellen (NIST, CRA-Anhang) bei dense-Rang ~89 — weit unter dem kleinen top-K. Auf dem größeren (95k) synced Korpus reicht ein fixer Tiefen-Pull von 60, um sie zum Kandidaten zu machen |
| `graphSeedCount` | `5` | nur die Top-Hits als Graph-Saat (Begrenzung der Expansion) |
| `graphMaxExpand` | `15` | Obergrenze der über Kanten gezogenen Normen |
| `graphHopPenalty` | `0.05` | leichte Distanz-Strafe pro Kante (Pool-Expansion, kein Ranking-Hebel) |
| `RAG_GRAPH_EXPANSION` | env, default **aus** | **Opt-in:** kein gemessener Rang-Nutzen ggü. der Binding-Augmentation, +1 Qdrant-Call/Suche, Flutungsrisiko über Reverse-Kanten. Bleibt als Recall-Sicherheitsnetz |
> Forward-Kanten (`references_out`) treiben die Graph-Expansion; Reverse-Kanten (`references_in`) werden **nur als Metadaten** geführt (sonst flutet ein populärer Anhang den Pool).
## Code
- `legal_rag_client.go``searchInternal()`, `mergeDedupHits()`
- `legal_rag_http.go``searchHybrid()`, `searchDense()`, `searchBinding()`, `searchControls()`
- `legal_rag_graph.go``expandViaGraph()`
## Adressierte Fehlerklassen
- **„Pflichtquelle nicht im Pool"** → Binding-Augmentation (Stufe 3) garantiert die `binding_law`-Quelle als Kandidat.
- **„Control-Quelle unter top-K"** → Control-Augmentation + `controlPoolDepth` (Stufe 4) holt tiefliegende NIST/CRA-Anhang-Treffer.
- **„Recall-Lücke bei Synonymen"** → Hybrid (RRF) deckt lexikalische Treffer ab, die rein semantisch fehlen.
+51
View File
@@ -0,0 +1,51 @@
# 02 — Authority-Re-Ranking
**Zweck:** Bindendes Recht der passenden Jurisdiktion/Domäne nach oben, Guidance/Fremdrecht/Off-Domain nach unten — **Reihenfolge only, nichts wird gelöscht**. Der `Score` trägt nach dem Rerank den Authority-Score, damit nachgelagerte Multi-Collection-Merges (Advisor) die Ordnung bewahren.
## Mechanik
`authorityScore()` (`authority_rerank.go`) berechnet pro Treffer einen normativen Relevanz-Score aus dem rohen Semantik-Score + gewichteter Autorität + Kontext-Bonus/Penalty:
```
score = rawSemantic
+ authorityCoef · weight/100 (Autorität, siehe 03)
+ jurisdictionGain (DE/EU-Match)
foreignPenalty (CH bei DE/EU-Frage)
unknownPenalty (unbekannte Klasse)
+ domainMatchGain (Chunk-Domäne == Query-Domäne)
offDomainPenalty (bindend, aber off-domain)
scopePenalty (BDSG Teil 3 bei allgemeiner DS-Frage)
+ topicGain (bevorzugte kanonische Norm)
supersededPenalty (status="superseded")
```
`rerankByAuthority()` sortiert stabil nach diesem Score und schreibt ihn zurück. `liftAboveBinding()` hebt bei **Auslegungs-Intent** eine semantisch konkurrenzfähige Guidance knapp über das bindende Recht — mit Margin-Guard, damit off-topic-Guidance das Gesetz nicht überholt.
## Konstanten + Warum
| Konstante | Wert | Warum |
|-----------|------|-------|
| `authorityCoef` | `0.40` | Gewicht→Score-Multiplikator. Konservativ kalibriert gegen die Offline-Golden-Harness (Phase A): hoch genug, dass bindendes Recht gewinnt, niedrig genug, dass starke Semantik nicht erschlagen wird |
| `jurisdictionGain` | `0.05` | leichter Vorzug für DE/EU-Quellen bei DE/EU-Frage |
| `foreignPenalty` | `0.60` | Fremdrecht (CH) bei DE/EU-Frage klar demoten — aber **nicht** entfernen (Vergleichsfälle bleiben auffindbar) |
| `unknownPenalty` | `0.08` | unklassifizierte Quellen leicht zurückstufen |
| `domainMatchGain` | `0.15` | Domänen-Treffer (data_protection / cyber / ai / product_safety) belohnen |
| `offDomainPenalty` | `0.10` | bindende, aber fachfremde Norm demoten (z.B. DSGVO bei reiner Cyber-Frage) |
| `scopePenalty` | `0.25` | BDSG §4584 (Justiz/Strafverfolgung) bei allgemeiner DS-Frage zurückstufen — häufige Scope-Verwechslung |
| `topicGain` | `0.18` | Verstärker für bevorzugte kanonische Normen (z.B. Art. 37 DSGVO bei DSB-Fragen) |
| `supersededPenalty` | `0.50` | abgelöste Alt-Quelle demoten, „damit Default-Fragen die eu-v1-Norm sehen, History aber auffindbar bleibt" |
| `intentLiftGain` | `0.10` | Epsilon-Lift einer Guidance über das beste bindende Recht bei Auslegungs-Intent |
| `intentLiftMargin` | `0.05` | Guard: Lift nur, wenn die Semantik innerhalb von 0.05 zum besten bindenden Treffer liegt |
**Auslegungs-Intent-Signale** (`guidanceIntentSignals`): `edpb`, `dsk`, `enisa`, `bsi`, `leitlinie`, `guideline`, `orientierungshilfe`, `auslegung`, `empfiehlt`, `empfehlung`, `sagt`, `laut`, …
## Code
- `authority_rerank.go``authorityScore()`, `rerankByAuthority()`, `bestBindingSemantic()`, `liftAboveBinding()`
## Adressierte Fehlerklassen
- **„Guidance verdrängt Gesetz"** → `authorityCoef`·weight hebt bindendes Recht; `liftAboveBinding` nur mit Margin-Guard.
- **„Fremdrecht Top-1"** → `foreignPenalty`.
- **„Off-Domain-Gesetz dominiert"** → `domainMatchGain` / `offDomainPenalty` / `scopePenalty`.
- **„Veraltete Norm gewinnt"** → `supersededPenalty` (siehe [08](08-explainability.md)).
+49
View File
@@ -0,0 +1,49 @@
# 03 — `source_class` (Rechtsnatur / Autorität)
**Zweck:** Die Autoritäts-Achse, die den **Rang** bestimmt (siehe [02](02-authority.md)). Deterministisch abgeleitet — der noch nicht re-ingestierte (ungetaggte) Korpus wird trotzdem klassifiziert, ohne Re-Tagging des Bestands.
## Mechanik
`classifyAuthority()` (`authority.go`) entscheidet in dieser Reihenfolge:
1. **Standard-NAME-Override** — erkannter Standard-Name (NIST/OWASP/ISO 27001/CIS/CSA CCM/Grundschutz) erzwingt `technical_standard` (Gewicht 80), **auch wenn die Payload `supervisory_guidance` sagt**. Grund: der Korpus taggt viele Standards mit generischem guidance-`source_class`; der Name ist autoritativer. `binding_law` bleibt unangetastet.
2. **Explizite Payload-Werte** — gesetztes `source_class` / `authority_weight` gewinnen.
3. **Marker-Fallback** — foreign → standard → guidance → regulation → unknown.
`inferJurisdiction()`: Fremd-Marker → `CH`; enthält `§` oder DE-Marker → `DE`; sonst → `EU`.
## Konstanten + Warum
**Gewichte je Klasse** (`sourceClassFromWeight()`):
| `source_class` | Gewicht | Schwelle | Bedeutung |
|----------------|---------|----------|-----------|
| `binding_law` | `100` | w ≥ 100 | bindendes Recht (Gesetz/VO) |
| `technical_standard` | `80` | 80 ≤ w < 100 | Best-Practice-Control-Katalog (NIST/OWASP/ISO) |
| `supervisory_guidance` | `70` | 70 ≤ w < 80 | Aufsichts-/Auslegungs-Guidance (ENISA/BSI/EDPB) |
| `unknown` | `50` | default | unklassifiziert |
| `foreign_law` | `0` | w ≤ 0 | Fremdrecht (CH) |
**Marker-Listen** (Substring-Match):
| Liste | Einträge (Auszug) | Wirkung |
|-------|-------------------|---------|
| `standardMarkers` *(vor guidance geprüft)* | NIST, OWASP, Grundschutz, ISO 27001, ISO/IEC 27001, CSA CCM, Cloud Controls Matrix, CIS Benchmark, CIS Control | → `technical_standard` (80) |
| `guidanceMarkers` | DSK, EDPB, BfDI, ENISA, BSI, EUCC, Standards Mapping, Orientierungshilfe, Handreichung, Leitlinie, Empfehlung, OECD, CISA, Blue Guide, … | → `supervisory_guidance` (70) |
| `foreignMarkers` | RevDSG, fedlex, (CH) | → `foreign_law` (0) |
| `deMarkers` | BDSG, DSK, BfDI, BayLfD, BSI | Signal **DE**-Jurisdiktion |
## Der Standard-Name-Override (Fix 2026-06-25)
**Problem:** Der CE-Korpus taggt z.B. `NIST SP 800-82r3` als `source_class=supervisory_guidance` (Gewicht 70), **nicht** technical_standard. `classifyAuthority` vertraute dem Payload-Tag → NIST landete als guidance, **kein `control_standard`** im Pool → die Diversity-Regel ([05](05-control-intent.md)) konnte nichts injizieren.
**Fix:** Erkannter Standard-Name überschreibt ein fehl-getaggtes guidance/unknown-`source_class``technical_standard`. Code-Fix, **kein Re-Ingest** nötig. Bindendes Recht bleibt unangetastet (Sanity geprüft: Rechtsfrage liefert weiterhin binding Top-1).
## Code
- `authority.go``classifyAuthority()`, `sourceClassFromWeight()`, `inferJurisdiction()`
## Adressierte Fehlerklassen
- **„Standard als guidance mistagged → kein control_standard"** → Standard-Name-Override.
- **„Fremdrecht falsch eingeordnet"** → `foreignMarkers` + `foreign_law`-Gewicht 0.
+60
View File
@@ -0,0 +1,60 @@
# 04 — `source_role` (Funktionale Rolle)
**Zweck:** Die zu `source_class` **orthogonale** Achse: *Was tut die Quelle im Dokument?* Sie bestimmt die **Control-Pool-Zugehörigkeit** bei Umsetzungsfragen — unabhängig von der Rechtsnatur. Deterministisch aus Markern abgeleitet, kein Re-Tagging des Bestands.
## Die 7 Rollen
| Konstante | Wert | Definition |
|-----------|------|-----------|
| `roleObligation` | `obligation` | die abstrakte Pflicht (das WAS) |
| `roleOperationalReq` | `operational_requirement` | konkrete bindende Anforderung (z.B. CRA Anhang I) |
| `roleProceduralReq` | `procedural_requirement` | Prozess: Meldung/Registrierung/DSFA/Incident |
| `roleControlStandard` | `control_standard` | Best-Practice-Katalog (NIST/OWASP/ISO/CIS) |
| `roleImplGuidance` | `implementation_guidance` | Umsetzungs-How-to (ENISA Good Practices, BSI) |
| `roleInterpretation` | `interpretation` | interpretiert die *Bedeutung* der Norm (EDPB-Leitlinie) |
| `roleDefinition` | `definition` | Definitionen / Scope / Recitals |
**Control-Pool** = `{operational_requirement, procedural_requirement, control_standard, implementation_guidance}` (die vier „wie umsetzen"-Rollen, `isControlPoolRole()`).
## Mechanik
`classifyRole()` (`control_role.go`) — Entscheidungsreihenfolge:
1. `IsRecital``definition`
2. `source_class == technical_standard``control_standard`
3. `source_class == supervisory_guidance`:
- enthält `implMarker``implementation_guidance`
- sonst → `interpretation`
4. `source_class == binding_law`:
- `definitionMarker``definition`
- `proceduralMarker``procedural_requirement`
- `annexMarker` **oder** `operationalMarker``operational_requirement`
- sonst → `obligation`
5. default → `obligation`
`controlRoleOf(payload)` klassifiziert die rohe Qdrant-Payload **vor** dem Mapping — so kann `searchControls` ([01](01-retrieval.md)) seinen tiefen dense-Pull filtern, ohne jeden Treffer voll zu materialisieren.
## Marker-Listen
| Liste | Einträge (Auszug) | → Rolle |
|-------|-------------------|---------|
| `proceduralMarkers` | Meldung, Meldepflicht, Notification, Registrierung, Konformitätserklärung, Incident, Reporting, Folgenabschätzung, DSFA, DPIA, Anzeigepflicht | `procedural_requirement` |
| `annexMarkers` | Anhang, Annex, Appendix, Anlage | `operational_requirement` |
| `operationalMarkers` | Anforderung, Requirement, essential, wesentliche | `operational_requirement` |
| `implMarkers` | Good Practice, Best Practice, Standards Mapping, Umsetzung, Implementation, Handreichung, Maßnahmenkatalog, ICS, SCADA, Technical Guideline, TIG | `implementation_guidance` |
| `definitionMarkers` | Begriffsbestimmung, Definition | `definition` |
## Warum orthogonal zu `source_class`
`source_class` (Rechtsnatur) und `source_role` (Funktion) sind **zwei Achsen**, nicht eine. ENISA bleibt `supervisory_guidance` (Rechtsnatur) **und** `implementation_guidance` (Funktion) — sie wird **nicht** umgetaggt (fachlich falsch), darf aber bei Umsetzungsfragen in den Control-Pool. So muss der Bestand nicht angefasst werden: `source_role` ist wie `source_class` aus Markern ableitbar.
`source_role` ist die **Wirbelsäule der Langzeit-Architektur** Regulation → Obligation → Operational Requirement → Control → Evidence ([09](09-framework-layer.md), Prio 4).
## Code
- `control_role.go``classifyRole()`, `controlRoleOf()`, `isControlPoolRole()`
## Adressierte Fehlerklassen
- **„Controls = nur technical_standard"** → vier Control-Pool-Rollen statt einer.
- **„abstrakte Pflicht dominiert Umsetzungsfrage"** → `obligation` ist *nicht* im Control-Pool (siehe [05](05-control-intent.md)).
@@ -0,0 +1,51 @@
# 05 — Control-Intent + Diversity
**Zweck:** Bei einer **Umsetzungsfrage** („Welche Controls/Maßnahmen passen?") den Control-Pool ([04](04-source-role.md)) über die abstrakte Pflicht heben — und sicherstellen, dass die Ergebnisliste **verschiedene Quellenarten** zeigt, statt dass eine Rolle sie flutet. Bei einer **Rechtsfrage** bleibt alles beim Authority-Rerank ([02](02-authority.md)).
## Intent-Erkennung
`queryWantsControls()` (`authority_rerank.go`) — Keyword-Match (`controlIntentSignals`):
> control, controls, maßnahme, schutzmaßnahme, best practice, umsetzen, implementier, absicher, härt, hardening, nist, owasp, grundschutz, ccm, iso 27001, isms
Nur wenn dieser Gate `true` ist, feuern `applyControlRoles()` und `ensureControlDiversity()`.
## Rollen-Boost (`applyControlRoles`)
Jeder Control-Pool-Treffer bekommt `controlPoolGain + controlRoleBonus[role]` auf den Score:
| Größe | Wert | Warum |
|-------|------|-------|
| `controlPoolGain` | `0.15` | hebt **jede** Control-Pool-Rolle über die Nicht-Control-Rollen (obligation/interpretation/definition) — sonst gewinnt die bindende abstrakte `obligation` per Autorität allein |
| `controlRoleBonus[operational_requirement]` | `0.100` | weicher Intra-Pool-Vorrang (User 2026-06-24): op_req zuerst |
| `controlRoleBonus[procedural_requirement]` | `0.075` | … dann Prozess-Pflichten |
| `controlRoleBonus[control_standard]` | `0.050` | … dann Standard-Kataloge |
| `controlRoleBonus[implementation_guidance]` | `0.000` | guidance als Basis, kein Bonus |
> **Bewusst weich, keine harte Hierarchie:** Eine semantisch dominante `implementation_guidance` (z.B. ENISA bei einer EU-Cyber-Umsetzungsfrage) **darf Top-1 bleiben** — das ist fachlich korrekt. Der Boost demoted nur die abstrakte Pflicht, er erzwingt keine Reihenfolge.
## Control-Diversity-Regel (`ensureControlDiversity`)
**Problem:** Selbst mit Boost kann eine dichte Wolke gleicher Rolle (viele ENISA-Chunks) `operational_requirement` und `control_standard` aus der Top-K verdrängen — die Quellenarten werden unsichtbar.
**Lösung (statt harter `+0.30`-Rollenkeule):** Wenn die Top-K nur `implementation_guidance` enthält, **injiziere** den besten `operational_requirement` + besten `control_standard` aus dem Pool, indem der niedrigst-platzierte redundante guidance-Slot verdrängt wird. Algorithmus:
1. Rolle jedes Treffers bestimmen (`roleAt`).
2. Prüfen, welche Rollen in der Top-K vertreten sind.
3. Für jede fehlende Wunsch-Rolle (`operational_requirement`, `control_standard`): besten Treffer dieser Rolle unterhalb der Top-K finden, niedrigste `implementation_guidance` in der Top-K überschreiben.
4. Truncate auf `topK` (das ursprüngliche Duplikat fällt im Tail weg).
**Ergebnis live:** Umsetzungsfrage → `1.4. ENISA · 5. NIST SP 800-82r3 (control_standard) · 6. MaschinenVO Anhang-III (op_req)`. ENISA behält Top-1, die anderen Quellenarten sind sichtbar.
> **Prinzip:** Nicht raten, nicht erzwingen, sondern relevante Quellenarten sichtbar machen.
## Code
- `authority_rerank.go``queryWantsControls()`
- `control_role.go``applyControlRoles()`, `ensureControlDiversity()`
## Adressierte Fehlerklassen
- **„abstrakte Pflicht dominiert Umsetzungsfrage"** → `controlPoolGain`.
- **„eine Rolle flutet die Top-K, Quellenarten unsichtbar"** → `ensureControlDiversity`.
- **„harte Tier-Ordnung overfittet auf eine Frage"** → weicher Boost statt Keule.
+45
View File
@@ -0,0 +1,45 @@
# 06 — Assessment
**Zweck:** Eine **auditierbare Begründungsschicht** über die gerankten Ergebnisse. Sie macht aus einer Trefferliste eine prüfbare Aussage: *Welche Norm ist primär, welche hängen daran, wie eindeutig ist das, braucht es einen Menschen?*
## Mechanik
`Assess()` (`legal_rag_assess.go`) nimmt die bereits gerankten `results []LegalSearchResult` und baut ein `LegalAssessment`:
| Feld | Inhalt |
|------|--------|
| `PrimaryNorm` | `CitationUnit` bzw. `ArticleLabel` des Top-Treffers |
| `PrimaryRegulation` | `RegulationShort` des Top-Treffers |
| `ConnectedNorms` | verbundene Normen (`references_out` + `references_in`), gekappt + dedupliziert |
| `CrossRegime` | ob mehrere Regulierungen in den Top-N liegen |
| `WinnerMargin` | Score-Abstand Top-1 ↔ Top-2 (Proxy für Eindeutigkeit) |
| `HumanReviewFlag` | true bei niedriger Eindeutigkeit |
| `ScoreReasoning` | kurze deutsche Begründung |
## Konstanten + Warum
| Konstante | Wert | Warum |
|-----------|------|-------|
| `assessConnectedCap` | `12` | Obergrenze der in der Assessment gezeigten verbundenen Normen — verhindert, dass ein stark vernetzter Artikel die Begründung flutet |
| `assessCrossRegimeTopN` | `5` | Fenster, über das „Cross-Regime" (mehrere Regulierungen) beurteilt wird |
| `assessReviewMargin` | `0.05` | enger Winner-Abstand → Human-Review-Flag (siehe [07](07-confidence.md)) |
## Human-Review-Logik
`HumanReviewFlag` wird `true`, wenn **eine** der Bedingungen gilt:
- `WinnerMargin < 0.05` — Top-1 und Top-2 liegen zu dicht beieinander (uneindeutig),
- `CrossRegime == true` — mehrere Regimes betroffen (z.B. DSGVO + CRA),
- der Primär-Treffer ist **nicht** `binding_law` — eine Rechtsaussage ohne bindende Primärquelle.
> Das ist die deterministische Eskalations-Schwelle: das System sagt von sich aus „hier sollte ein Mensch drauf schauen", statt scheinbare Sicherheit vorzutäuschen.
## Code
- `legal_rag_assess.go``Assess()`, `primaryLabel()`
## Adressierte Fehlerklassen
- **„uneindeutige Antwort wird als sicher verkauft"** → `WinnerMargin` + `HumanReviewFlag`.
- **„Cross-Regime übersehen"** → `CrossRegime` über `assessCrossRegimeTopN`.
- **„Rechtsaussage ohne bindende Quelle"** → Flag bei nicht-bindendem Primär-Treffer.
+38
View File
@@ -0,0 +1,38 @@
# 07 — Confidence
**Zweck:** Eine ehrliche Aussage über die Verlässlichkeit eines Ergebnisses — ohne einen erfundenen „Confidence: 87 %"-Wert, der Scheinsicherheit suggeriert.
## Bewusste Entscheidung: kein eigenes Confidence-Feld
Es gibt **kein** explizites `confidence`-Feld in der Engine. Stattdessen wird Verlässlichkeit aus zwei real berechneten, prüfbaren Größen abgeleitet:
| Größe | Quelle | Bedeutung |
|-------|--------|-----------|
| `WinnerMargin` | `LegalAssessment` ([06](06-assessment.md)) | Score-Abstand Top-1 ↔ Top-2 — wie klar „gewinnt" die Primärnorm? |
| `HumanReviewFlag` | `LegalAssessment` | deterministische Eskalation: ist die Antwort uneindeutig/grenzwertig? |
**Warum so?** Ein kalibrierter Wahrscheinlichkeitswert würde eine Genauigkeit vortäuschen, die ein regelbasierter Retriever nicht hat. Der **Abstand** zwischen Top-1 und Top-2 ist dagegen eine *gemessene*, erklärbare Größe: ein großer Margin = eindeutige Norm, ein kleiner Margin = mehrere plausible Quellen → Mensch entscheiden lassen.
## Schwelle
| Konstante | Wert | Wirkung |
|-----------|------|---------|
| `assessReviewMargin` | `0.05` | `WinnerMargin < 0.05``HumanReviewFlag = true` |
`HumanReviewFlag` feuert zusätzlich bei Cross-Regime und bei nicht-bindender Primärquelle ([06](06-assessment.md)).
## Verhältnis zur Authority-Schicht
Der `Score`, auf dem der Margin beruht, ist **nicht** der rohe Semantik-Score, sondern der Authority-Score nach dem Rerank ([02](02-authority.md)). Damit misst der Margin die *normative* Eindeutigkeit (Rechtsnatur + Domäne berücksichtigt), nicht nur die semantische Ähnlichkeit.
## Code
- `legal_rag_types.go``LegalSearchResult.Score`, `LegalAssessment.WinnerMargin`, `LegalAssessment.HumanReviewFlag`
- `legal_rag_assess.go` → Berechnung in `Assess()`
## Adressierte Fehlerklassen
- **„Scheinsicherheit"** → kein erfundener Prozentwert; Margin + Flag statt Pseudo-Confidence.
- **„knappe Entscheidung wird automatisch durchgewinkt"** → `assessReviewMargin`-Eskalation.
> **Ausbaustufe:** Echte Citation-Gating-Confidence (Finding nur bei Quelle ∧ Scope ∧ Stichtag) gehört in die Authority-/Freshness-Schicht und an Control → Evidence ([09](09-framework-layer.md)), nicht in einen Modell-Score.
@@ -0,0 +1,42 @@
# 08 — Explainability, Zitate + Supersede
**Zweck:** Jedes Ergebnis muss sich **belegen** lassen — woher es kommt, womit es verbunden ist, und ob es noch gilt. Das ist die Grundlage für Zitierfähigkeit und für die spätere Citation-Gating-Logik.
## Zitate + Graph-Kanten
Aus der Qdrant-Payload geladen (Phase-2-Graph-Metadaten):
| Feld | Inhalt | Verwendung |
|------|--------|-----------|
| `CitationUnit` | kanonischer Artikel-/Anhang-Identifier | Dedup, Primärnorm-Label |
| `article_label` | menschenlesbare Fundstelle (z.B. „Art. 13 CRA") | Anzeige, Begründung |
| `citation_style` | Zitierformat-Marker | Anzeige |
| `references_out` | Normen, die dieser Chunk **zitiert** (Forward-Kanten) | Graph-Expansion ([01](01-retrieval.md)) + `ConnectedNorms` |
| `references_in` | Normen, die **diesen** Chunk zitieren (Reverse-Kanten) | **nur** Metadaten — nicht expandiert (Flutungsschutz) |
`Assess()` ([06](06-assessment.md)) verdichtet die Kanten zu `ConnectedNorms` — so wird sichtbar, dass z.B. Art. 13 CRA auf Anhang I verweist (die eigentliche Pflichtquelle).
## Supersede-Handling
Recht ändert sich; ein veralteter Stand darf den aktuellen nicht schlagen — aber Übergangs-/History-Fragen müssen ihn noch finden.
| Mechanik | Wert / Feld | Verhalten |
|----------|-------------|-----------|
| **Erkennung** | Payload `status == "superseded"``Superseded`-Flag | markiert die abgelöste Alt-Quelle |
| **Demotion** | `supersededPenalty = 0.50` (`authorityScore`, [02](02-authority.md)) | konsequente Zurückstufung |
| **Philosophie** | — | „Alt-Quelle demoted (nicht versteckt) — Default-Fragen sehen die eu-v1-Norm, History bleibt auffindbar" |
> **Nicht entfernt, nur bestraft:** Eine abgelöste Norm kann bei einer expliziten History-Frage trotzdem hoch ranken — sie wird nur konsistent demoted, nicht ausgeblendet. Das ist dieselbe „Reihenfolge, nichts löschen"-Linie wie beim Authority-Rerank.
## Code
- `legal_rag_client.go` → Payload-Mapping (`references_out/in`, `status`)
- `legal_rag_graph.go` → Forward-Kanten-Expansion, Reverse-Kanten als Metadaten
- `legal_rag_assess.go``ConnectedNorms`
- `authority_rerank.go``supersededPenalty`
## Adressierte Fehlerklassen
- **„Aussage ohne Fundstelle"** → `CitationUnit` / `article_label`.
- **„Pflichtquelle hinter Verweis versteckt"** → Forward-Kanten-Expansion (Art. 13 → Anhang I).
- **„veralteter Rechtsstand gewinnt"** → `supersededPenalty`, aber auffindbar.
@@ -0,0 +1,51 @@
# 09 — `framework_*`-Layer (Control-Mapping-Brücke)
**Zweck:** Einen **konkreten Control adressierbar** machen (z.B. `V14.2.4`), damit das System vom „welches Dokument passt?" zum „welcher konkrete Control erfüllt CRA Annex I?" übergeht. Das ist die Brücke zur nächsten Stufe — **Control → Evidence** — und der eigentliche Burggraben.
> **Ehrlicher Status:** Dieser Layer lebt **heute in der Qdrant-Payload**, nicht im Retrieval-Code. Die `ucca`-Engine liest/routet `framework_*` (noch) nicht — sie ist die **Datengrundlage**, auf der Prio 4 aufsetzt. `framework_control` reist aktuell im Feld `article` mit und ist daher bereits in den Antworten sichtbar.
## Schema (pro Chunk)
| Feld | Beispiel (OWASP) | Bedeutung |
|------|------------------|-----------|
| `framework` | `OWASP ASVS` | Rahmenwerk |
| `framework_version` | `5.0` | Version (mit `superseded`-Mechanik historisierbar, [08](08-explainability.md)) |
| `framework_section` | `V6` | Kapitel/Sektion |
| `framework_control` | `V6.2.4` | konkrete Requirement-ID — der adressierbare Control |
| `framework_section_name` | `Password Security` | menschenlesbarer Kontext |
| `asvs_level` | `L1`/`L2`/`L3` | (OWASP-spezifisch) Stufe |
Analog für NIST geplant: `framework="NIST SP 800-53"`, `framework_family="SI"`, `framework_control="SI-2"`, `framework_revision="5"`.
## OWASP ASVS 5.0 — die erste Referenz (Parser-4-Muster)
- **Quelle:** `OWASP/ASVS` GitHub, `5.0/docs_en/...flat.json` (345 Requirements). Lizenz **CC-BY-SA-4.0** (zulässig; nur CC-BY-NC ist geblockt), Attribution `OWASP`.
- **Ingestion = per-Requirement Direct-Upsert** (nicht der RAG-Chunker, der `framework_control` zerschneiden würde): 1 Qdrant-Punkt pro Requirement, `id = uuid5("owasp_asvs_5.0_"+req_id)` (idempotent), `source_class=technical_standard` / `authority_weight=80`, bge-m3-Vektor.
- **Stand:** 345 Punkte auf macmini-qdrant **und** qdrant-dev, live verifiziert (`„OWASP … Authentifizierung"` → Top-OWASP mit `V`-Codes).
- **Lehre:** Künftige Standards (NIST-Re-Tag, BSI Grundschutz) **immer** mit `source_class=technical_standard` + `framework_*` direkt setzen — das NIST-Altskript ließ `source_class` leer, daher der guidance-Mistag ([03](03-source-class.md)).
## Brücke zu Prio 4 — Control → Evidence
```
Regulation
↓ (legal obligation layer)
Obligation
↓ (source_role: operational_requirement)
Operational Requirement ── CRA Annex I
↓ (Control-Mapping über framework_control)
Control ── OWASP V6.x · NIST SI-2 · BSI OPS.1.1
Evidence ── der Nachweis, den ein Auditor sehen will
```
Der nächste Schritt verdrahtet `framework_control` in eine **Control-Mapping-Tabelle** (welcher konkrete Control erfüllt welche Obligation) und darunter die **Evidence-Schicht**. NIST + BSI ziehen im selben `framework_*`-Muster nach.
## Code / Daten
- Daten: Qdrant `bp_compliance_ce` (Payload-Felder oben), Ingestion-Skripte (`ingest_owasp.py` u.a.)
- Retrieval-Verdrahtung: **offen** (Prio 4)
## Adressierte Fehlerklassen
- **„nur Dokument-Treffer, kein adressierbarer Control"** → `framework_control` pro Chunk.
- **„Control-Katalog ohne Stand"** → `framework_version` + Supersede.
+57
View File
@@ -0,0 +1,57 @@
# RAG-Retrieval-Engine — Architektur
Diese Sektion dokumentiert die **deterministische, regelbasierte Retrieval-Engine** des Compliance-SDK (`ai-compliance-sdk/internal/ucca/`). Sie beantwortet für jede Nutzerfrage: *Welche Norm/Quelle ist relevant — und warum?*
> **Warum diese Doku existiert:** Die Engine trifft viele bewusste `+0.05 / +0.10`-Entscheidungen. Jede Konstante kodiert eine **gemessene** Entscheidung (Golden-Harness, Fehlerklasse) — nicht eine willkürliche Stellschraube. Ohne das *Warum* sind sie in sechs Monaten nicht mehr nachvollziehbar; diese Doku ist die Referenz für Wartung, Onboarding und Audit-/Investoren-Nachweis.
## Leitprinzip
> **Nicht raten, nicht erzwingen, sondern relevante Quellenarten sichtbar machen.**
Der LLM entscheidet **nicht**, was Recht ist — nur, wie eine bereits versionierte, zitierte Norm auf einen Sachverhalt gemappt wird. Wo möglich ist die Engine deterministisch (Marker, Gewichte, Schwellen), nicht modellbasiert. Nichts wird *gelöscht* — Re-Ranking ist reine Reihenfolge, alles bleibt auffindbar.
## Zwei orthogonale Achsen
Der Kern des Modells: zwei unabhängige Achsen, die in der Literatur meist vermischt werden.
| Achse | Frage | Wirkung | Doku |
|------|-------|---------|------|
| **`source_class`** (Rechtsnatur) | Wie bindend ist die Quelle? | bestimmt den **Rang** | [03](03-source-class.md) |
| **`source_role`** (Funktion) | Was tut die Quelle im Dokument? | bestimmt die **Control-Pool-Zugehörigkeit** | [04](04-source-role.md) |
Beispiel: NIST ist `technical_standard` (source_class) **und** `control_standard` (source_role). ENISA-Good-Practices sind `supervisory_guidance` **und** `implementation_guidance` — sie bleiben guidance, dürfen aber bei Umsetzungsfragen in den Control-Pool.
## Pipeline (Überblick)
```
Query
│ bge-m3 Embedding
Retrieval-Pool ── hybrid (RRF) + binding-Augmentation + control-Augmentation + (graph) → 01
Authority-Rerank ── source_class → Rang (bindendes Recht der passenden Jurisdiktion oben) → 02, 03
Control-Intent ── source_role → Control-Pool + Diversity (Quellenarten sichtbar machen) → 04, 05
Assessment ── PrimaryNorm · ConnectedNorms · WinnerMargin · CrossRegime → 06
Confidence/Explainability ── HumanReviewFlag · Zitate · Graph-Kanten · Supersede → 07, 08
```
`framework_*` ([09](09-framework-layer.md)) ist die **Daten-Brücke** zur nächsten Stufe (Control → Evidence) — heute in der Qdrant-Payload, noch nicht im Retrieval-Code verdrahtet.
## Dokumente
| # | Dokument | Inhalt |
|---|----------|--------|
| 01 | [Retrieval-Pipeline](01-retrieval.md) | Pool-Aufbau: hybrid + binding + control + graph |
| 02 | [Authority-Re-Ranking](02-authority.md) | source_class → Rang, Bonus/Penalty-System |
| 03 | [source_class](03-source-class.md) | Rechtsnatur, Gewichte, Marker, Standard-Name-Override |
| 04 | [source_role](04-source-role.md) | 7 Rollen, Control-Pool, Klassifikation |
| 05 | [Control-Intent + Diversity](05-control-intent.md) | Intent-Erkennung, Rollen-Bonus, Diversity-Regel |
| 06 | [Assessment](06-assessment.md) | Auditierbare Begründungsschicht |
| 07 | [Confidence](07-confidence.md) | WinnerMargin, HumanReviewFlag |
| 08 | [Explainability + Supersede](08-explainability.md) | Zitate, Graph-Kanten, Supersede |
| 09 | [framework_*-Layer](09-framework-layer.md) | Control-Mapping-Brücke (CRA Annex → OWASP V6.x) |
> **Fehlerklassen-These:** Modell und Korpus sind austauschbar; die *Fehlerklassen + Hebel* sind das IP. Jede Konstante unten adressiert eine benannte Fehlerklasse (z.B. „Guidance verdrängt Gesetz", „Standard als guidance mistagged"). Die Kalibrierung ist sublinear: wenige Klassen, viele Module.
@@ -0,0 +1,89 @@
# Obligation Aggregation — Validated Shadow Results (2026-06-24)
Status: **bewiesen im Shadow auf macmini**, NICHT deployt, NICHT live geschaltet.
Code auf Branch `feat/obligation-aggregation`; das LLM-Tiering der recipients/transfer-
Controls liegt als DB-Marker nur auf macmini.
Dieser Stand validiert die Ausführung des [Legal Obligation Layer v1](legal_obligation_layer_v1.md)
über vier ineinandergreifende Schichten.
## Die vier Schichten
1. **Obligation Aggregation**`compliance/services/obligation_aggregation.py`.
Aggregiert Kriterium-/Control-Bewertungen zu Findings auf OBLIGATION-Ebene
(Regulation → Obligation → Control → Criterion). Redundanz kollabiert per OR pro
`legal_basis`-Anforderung; fail-safe Status (MET/PARTIAL/FAILED/NA/UNDETERMINED/OPEN).
2. **Applicability**`compliance/services/obligation_applicability.py`.
Prädikate (`has_third_country_transfer`, `uses_legitimate_interest`, `direct_marketing`,
`legitimate_interest_or_public_task`) entscheiden bedingte Obligations → True/False/None
(unbekannt → anwendbar, nie stille NA).
3. **Recall-limited Segregation**`compliance/services/obligation_taxonomy.py` +
`specialist_agents/dse/_obligation_shadow.py`.
`decision_method_required=LLM` trennt FAILED ehrlich in `failed_by_current_checker`
(echte Lücke) vs `recall_limited` (Prüfer kann mit aktueller Methode nicht verifizieren).
4. **Targeted LLM Fix** — recipients/transfer-Controls mit `tiered_criteria`
(decision_method=LLM) → Layer 3 nutzt den **Haiku-Sufficiency-Judge** statt Keyword/Embedding.
## Shadow-Zahlen (7 Firmen, Live-Engine, Keyword/Embedding)
| | Wert |
|---|---|
| legacy control-findings | 136 |
| obligation findings | 29 |
| **Kollaps** | **4,7×** |
| davon echte Lücken | 23 |
| davon recall_limited | 6 (nur 2/7 Firmen, nur Drittland/Garantien) |
| MET (FP-Korrektur) | 46 |
| N/A (Applicability) | 2 |
`recall_limited` ist klein + konzentriert: ausschließlich `third_country_transfer_disclosed` /
`safeguards_disclosed` / `safeguards_accessible`, je 2/7 Firmen. `recipients_disclosed`
manifestierte nie als recall_limited (Keyword/Embedding trägt dort).
## Targeted LLM Fix — Validierung (teamviewer + safetykon)
Recall-Defekt-Diagnose (teamviewer): die Drittland-/Garantien-Offenlegung steht dicht in
einem Absatz („…außerhalb EU/EWR … Standardvertragsklauseln/Schutzmaßnahmen"), aber
**0/22 Controls** trafen — Keyword (Vokabular-Mismatch) und Embedding (cos 0.490.57, teils
falscher Chunk) versagen. Kein Schwellen-Fix → CONTENT/LLM-Klasse.
Nach LLM-Tiering (Haiku-Judge):
| | vorher (kw+emb) | nachher (LLM) |
|---|---|---|
| teamviewer findings | 5 | **0** |
| teamviewer recall_limited | 3 | **0** |
| safetykon findings | 7 | **4** |
| safetykon recall_limited | 3 | **0** |
- **teamviewer → 0 Findings:** DSE auf diesen Pflichten real konform; die 5 alten Findings
waren Falsch-Positive des Keyword/Embedding-Prüfers.
- **safetykon → 4 (keine Über-Korrektur):** Drittland/Garantien → MET, aber
`art20_right_exists_core` + `art20_machine_readable_format` bleiben **FAILED** (echte
Portability-Lücke), `legitimate_interest_disclosed`**NA** (Applicability).
## Eingesetztes Modell
Der Tiered-/Sufficiency-Pfad ist **fest auf Claude Haiku 4.5 verdrahtet**
(`checkers/router.py:build_spec` setzt für CONTENT/LLM `extra.judge="haiku"`
`llm_checker._haiku``_call_anthropic`; validierter Judge P0.89/R0.91, Entscheidung
2026-06-22). **Nicht** die OVH-Kaskade (35b/120b), **nicht** Opus. Konsequenz: der Fix
reproduziert sich überall identisch, braucht aber einen gültigen Anthropic-Key für den
Haiku-Judge — auch auf dev.
## Nächster operativer Block (gegated, NICHT ausgeführt)
```
Deploy-Fenster frei (andere Session fertig)
dev-DB-Tiering replizieren (die 22 recipients/transfer-Controls)
Haiku-Judge auf dev bestätigen (gültiger Anthropic-Key — NICHT der OVH-Pfad)
Shadow aktiv lassen (Telemetrie), Produktverhalten unverändert
erst dann Umschalten planen
```
Folge-Cleanup: sobald LLM-Tiering Standard ist, wird die `recall_limited`-Segregation für
diese 4 Obligations obsolet (dann ist FAILED = echte Lücke, nicht Reichweitenproblem).
@@ -0,0 +1,77 @@
# Obligation Discovery Pipeline v1
Ein **generisches Verfahren zur Ableitung einer regulatorischen Ontologie** (Legal Obligation
Registry) aus großen Compliance-Korpora. Validiert über drei Domänen (SBOM, Vulnerability
Handling, Authentication). Erzeugt die zitierfähige Mitte aus
[obligation_registry_v1.md](obligation_registry_v1.md).
## Architekturregel (nicht verhandelbar)
```
RUNTIME bleibt deterministisch (Document → Embedding → LLM-Judge → Finding)
DISCOVERY darf LLM-gestützt sein (Controls → … → LLM-Synthese → Obligation Registry)
```
Discovery läuft **einmalig/offline** mit dem stärksten Modell; die Runtime-Prüf-Engine wird
davon nicht berührt. Zwei getrennte Probleme, eine gemeinsame Sprache (die Obligation).
## Stufen (`scripts/obligation_discovery/`)
| Stufe | Skript | Aufgabe | Key |
|---|---|---|---|
| 1 | `precluster.py` | Controls (scope) → Embedding (gecacht) → **Mikro-Cluster** | |
| 2 | `meta_cluster.py` | Mikro → **Review Units** (Skalierungs-Fix für große Domänen) | |
| 3 | `synthesize_obligations.py` | Review Units → Opus → **Obligation Candidates** | ENV |
| 4 | `validate_registry.py` | Belastbarkeits-Checks | |
| 5 | `merge_review_diff.py` | vorgeschlagene Beziehungskanten dedupliziert mergen | |
Reine, unit-getestete Helfer in `_core.py`. Ausführung im `bp-compliance-backend`-Container
(`PYTHONPATH=/app`); der Key kommt aus `ANTHROPIC_API_KEY` (nie hartcodiert).
## Zwei-Stufen-Clustering = der Skalierungs-Fix
Ein flacher Single-Threshold-Pre-Cluster + EIN LLM-Synthese-Call skaliert NICHT auf große
Domänen. Lösung: eine Hierarchiestufe. **Review Unit ≠ Meta-Cluster** — die Review Unit ist
das, was der LLM sieht (entkoppelt vom Clustering, später merge/split-bar).
## Belegte Meilensteine
| Domäne | Controls | → Cluster/Review Units | → Obligations | vs Ground Truth |
|---|---|---|---|---|
| **SBOM** | 258 | 86 Mikro | 12 (→ 11 final) | manuell ~10 — **reproduziert + verfeinert** |
| **Vulnerability** | 531 | 200 Mikro | 8 | manuell ~7 — **reproduziert** |
| **Authentication** | 4408 | 2134 Mikro → **170 Review Units** | 54 → Kuration **29** | Skalierung — **generalisiert** |
## Harte Tier-Regel generalisiert
`LEGAL_MINIMUM` nur mit Primärrechts-Anker (`legal_basis`), sonst `BEST_PRACTICE` /
`IMPLEMENTATION_GUIDANCE` / `EVIDENCE`. Authentication zeigt den Wert: nur **6** harte
Pflichten (CRA fordert „angemessene Authentisierung"), MFA/Passwort/Session/Krypto sind
`guidance_basis`. So kann der Advisor sagen: *„Gesetzlich gefordert ist Schutz vor unbefugtem
Zugriff; MFA ist anerkannte Umsetzung, aber keine CRA-Wortlautpflicht."*
## Kuration (große Domänen)
Die Synthese darf über-splitten; ein **key-freier, regelbasierter Kurations-Pass** verdichtet:
Krypto-Mikro-Mechanismen → `guidance_basis`; Prüf-/Nachweis-Themen → `evidence`-Facette;
Mechanismus-Familien bleiben; domänenfremdes (eID/PSD2) → `out_of_scope`; LEGAL_MINIMUM
unangetastet.
## Lessons
- Große Opus-Calls brauchen **Streaming** (`messages.stream`); der SDK blockt non-streaming
bei `max_tokens` > ~8k mit „Streaming is required for operations that may take longer than 10 minutes".
- Provenance pro Obligation (`source_meta_cluster`, `discovery_confidence`, `llm_model`,
`synthesis_version`) — für spätere Evolution (CRA-Update, Modellwechsel).
- `>8 Obligations / Review Unit` → automatische Review-Warnung (Over-Split-Indikator).
- Embedding-Cache (pickle) → THR2-Sweeps ohne Re-Embed.
## End-to-End-Beispiel
```bash
# im bp-compliance-backend-Container, PYTHONPATH=/app, cwd = scripts/obligation_discovery
python3 precluster.py --scope auth
python3 meta_cluster.py --scope auth --meta-thr 0.62 # → /tmp/auth_review_units.json (inspizieren!)
ANTHROPIC_API_KEY=… python3 synthesize_obligations.py \
--units /tmp/auth_review_units.json --regulation CRA --theme "Authentisierung" --out /tmp/auth_registry.json
python3 validate_registry.py /tmp/auth_registry.json
```
@@ -0,0 +1,130 @@
# Obligation Registry v1 — Schema, Zitierfähigkeit, Zwei-Graphen-Architektur
Status: **Spec festgeschrieben (2026-06-24)**. Baut auf
[legal_obligation_layer_v1.md](legal_obligation_layer_v1.md) +
[obligation_aggregation_validation.md](obligation_aggregation_validation.md).
Die Obligation Discovery Pipeline v1 ist gegen Ground Truth validiert
(SBOM 12 vs 10, Vuln 8 vs 7, out_of_scope + conditional Applicability korrekt).
## Leitsatz
**Die Legal Obligation ist das fachliche Wissensobjekt der Plattform** — nicht der Master
Control. Controls sind Prüfstrategien / Erkennungsmuster / Evidenzsammler FÜR eine Obligation.
Ohne Zitierfähigkeit ist die Registry fachlich nicht belastbar: die erste Kundenfrage ist
immer „**Wo steht das?**".
## Zwei Assets, zwei Graphen, EIN Join (nicht verschmelzen, verbinden)
- **Asset 1 — Compliance Knowledge** (bereits gebaut): 313k atomare Controls, 33k Master
Controls, ~14k use-case-gemappt, Dedup, Obligation Layer, Applicability, Tiering, G/C/E.
- **Asset 2 — Zitierfähige Wissensbasis** (entsteht in anderer Session): Dokument → Chunk →
Paragraph → Span → Zitat.
Die beiden werden **NICHT verschmolzen** (das wäre wie eine normalisierte DB nach CSV zu
exportieren und neu zu importieren). Sie werden über die **Obligation gekoppelt**:
```
GRAPH 1 — Legal Knowledge Graph (Chat/Advisor) GRAPH 2 — Compliance Execution Graph (Engine)
Regulation → Annex/Artikel → Paragraph → Span Obligation → Control → Criterion → Evidence → Finding
\ /
\____ LEGAL OBLIGATION ______/ ← gemeinsame Sprache (der Join)
```
Chat: „diese Aussage stammt aus Absatz X." · Engine: „diese Obligation ist nicht erfüllt." →
beide meinen DIESELBE `obligation_id`.
## Registry-Schema v1
```yaml
id: # snake_case, regulierungs-agnostisch (z.B. sbom_complete)
name: # kurz
description: # 1 Satz
tier: # LEGAL_MINIMUM | BEST_PRACTICE | IMPLEMENTATION_GUIDANCE | EVIDENCE
family: # Organisationshilfe (z.B. sbom, vulnerability_handling)
applicability: # universal | conditional:<pred> | domain:<x>
facets: # welche Evidenz-Facetten die Pflicht belegt
governance: bool
capability: bool
evidence: bool
legal_basis: # PRIMÄRRECHT — Pflicht zwingend (mind. 1 Anker für LEGAL_MINIMUM)
- source: CRA
regulation_code: eu_2024_2847
article: "" # falls zutreffend
annex: "Annex I, Part II"
section: ""
paragraph: ""
span_id: "" # harter Anker in die zitierfähige Wissensbasis (Asset 2)
document_version: ""
citation: "" # menschenlesbar
guidance_basis: # SEKUNDÄR — Umsetzung/Best Practice, NICHT Pflicht
- source: NIST SSDF
anchor: ""
role: best_practice # implementation_guidance | best_practice
member_controls: # control_uuids (Prüflogik aus Asset 1)
citation_anchor_ids: # span/paragraph-Anker (Asset 2) — auf der OBLIGATION, NICHT auf Controls
relationships: # siehe Beziehungsgraph
decision_method: # CONTENT/LLM | CONTENT/EMBEDDING | FIELD/REGEX | BEHAVIOR/PLAYWRIGHT ...
out_of_scope: [] # ausgeschlossene Cluster + Begründung
```
## Zitierfähigkeit hängt an der OBLIGATION (nicht an Controls)
258 SBOM-Controls → 11 Obligations: nur die **Obligation** speichert
`CRA / Annex I / Paragraph X / chunk_id / span_id / document_version`. Die 258 Controls zeigen
nur auf die `obligation_id`. Folge: **Regulierungsänderung (CRA v1→v2) = `citation_anchor`
tauschen, Controls bleiben identisch.** Massive Pflegeersparnis + Versionsstabilität.
## `legal_basis` vs `guidance_basis` + `source_role`
Damit beim Verschmelzen von CRA + NIST + OWASP zu einer Obligation NICHT verloren geht, was
Pflicht / Best Practice / Evidenz / Umsetzung ist, klassifiziert die Discovery-Pipeline jeden
Member/Cluster mit einer **`source_role`**:
```
LEGAL_BASIS → Primärrecht (begründet die Pflicht)
GUIDANCE → NIST/OWASP/ENISA/BSI/ISO (Umsetzung/Best Practice)
EVIDENCE → Nachweis/Bericht/Audit
IMPLEMENTATION → technische Umsetzungsanweisung
OUT_OF_SCOPE → gehört nicht zur Obligation (andere Regulierung/Domäne)
```
## HARTE Tier-Regel
Eine Obligation wird **`LEGAL_MINIMUM` nur mit mindestens einem Primärrechts-Anker**
(`legal_basis` nicht leer). Ohne Primärrechts-Anker:
`BEST_PRACTICE | IMPLEMENTATION_GUIDANCE | EVIDENCE`**aber niemals Pflicht.**
## Beziehungsgraph (Ontologie)
**Strukturell** (bereits in der Pipeline): `same_obligation`, `sub_obligation`,
`applicability_variant`, `evidence_for`, `governance_for`, `out_of_scope`.
**Semantisch (NEU, P2-Ergänzung):** `requires`, `implements`, `supports`,
`produces_evidence_for`, `depends_on`, `derived_from`. Beispiele:
```
sbom_established --supports--> vulnerability_handling --supports--> incident_reporting
authentication --requires--> credential_management
```
→ für den Compliance Advisor extrem wertvoll (er kann Pflicht-Ketten erklären).
## Citation-Anchor-Pipeline (Document → Obligation, NICHT Document → Control)
Der neue Ingest erzeugt zusätzlich zu Chunk/Embedding: `paragraph_uuid`, `span_uuid`,
`document_version`, `legal_citation`, `referenced_articles`, `referenced_regulations`.
**Erst danach** läuft Obligation Discovery, sodass jede neu entdeckte Obligation sofort ihre
Primärquelle bekommt:
```
Neue Dokumente → Chunking → Span IDs → LLM („welche Obligation(en)?") → Confidence
→ Review → obligation.citation_anchor_ids[]
```
Die alten Controls werden wiederverwendet; die Pipeline erzeugt zusätzlich Obligation→Evidence
und Obligation→Citation-Anchors. **Kein Re-Ingest zum Neubau von Controls.**
## Sequenz (geändert — Registry vor weiteren Cuts)
```
SBOM ✓ → Vuln ✓ → Registry v1 (DIESE Spec) → Ontologie/Beziehungsgraph ergänzen
→ Authentication → Remote Access → Logging → Updates
```
Begründung: Schema jetzt billig änderbar; bei 3001000 Obligations wird jede Schemaänderung
teuer. Fortschritt wird daran gemessen, ob jede neue Obligation die Registry besser macht —
nicht an neuen Controls.
+11
View File
@@ -56,6 +56,17 @@ markdown_extensions:
nav: nav:
- Start: index.md - Start: index.md
- Architektur RAG:
- Übersicht: architecture/index.md
- 01 Retrieval-Pipeline: architecture/01-retrieval.md
- 02 Authority-Re-Ranking: architecture/02-authority.md
- 03 source_class: architecture/03-source-class.md
- 04 source_role: architecture/04-source-role.md
- 05 Control-Intent + Diversity: architecture/05-control-intent.md
- 06 Assessment: architecture/06-assessment.md
- 07 Confidence: architecture/07-confidence.md
- 08 Explainability + Supersede: architecture/08-explainability.md
- 09 framework_*-Layer: architecture/09-framework-layer.md
- Services: - Services:
- AI Compliance SDK: - AI Compliance SDK:
- Uebersicht: services/ai-compliance-sdk/index.md - Uebersicht: services/ai-compliance-sdk/index.md
@@ -0,0 +1,67 @@
{
"schema_version": "controls_for_obligation_mapping_v1",
"purpose": "Accepted CRA->OWASP controls (Compliance Execution Graph) for the Obligation Registry to propose the SEMANTIC control->obligation_id, replacing the coarse citation_unit interim join. Fill proposed_obligation_id per control, then we adopt it into control_mapping.obligation_id.",
"source": "ai-compliance-sdk control_mappings, mapping_status=accepted, reviewed_by=benjamin 2026-06-25",
"filled_by": "obligation-registry-session 2026-06-25 (alle 7/7: 4 auth/crypto + 3 logging via cra_logging.json)",
"join_principle": "SEMANTISCH via obligation_id, NICHT via citation_unit/legal_basis-Anker. Die CRA-Anker sind im Registry teils approximativ (siehe anchor_quality_note) — daher ist obligation_id der stabile Primaerschluessel, nicht der Anker.",
"anchor_quality_note": "Registry-legal_basis-Anker sind teils CRA-Part-I-fehlzugeordnet (Opus-Synthese): user_authentication_required steht auf (2)(d) statt (2)(c); Crypto-Obligations auf (2)(e) statt (2)(d). CRA Annex I Part I: (2)(c)=Zugriffsschutz, (2)(d)=Vertraulichkeit, (2)(e)=Integritaet. Korrektur kommt mit dem zitierfaehigen Re-Ingest (span-genau). Deshalb: NICHT auf Anker joinen. ABER: der Logging-Cut (V16.*) ist korrekt auf (2)(k) verankert (echte Logging-Subsektion, kein Fehl-Anker).",
"count": 7,
"controls": [
{
"framework": "OWASP ASVS", "control": "V6.3.1",
"source_norm": "CRA Annex I Part I (2)(c) — Schutz vor unbefugtem Zugriff",
"citation_unit": "Annex I (2)(c)", "family": "auth", "mapping_type": "supports",
"proposed_obligation_id": "user_authentication_required",
"mapping_method": "semantic",
"mapping_note": "Zugriffsschutz/Authentisierung-vor-Zugriff = Nutzer-Auth (NICHT firmware, trotz strukturellem (2)(c)-Join)"
},
{
"framework": "OWASP ASVS", "control": "V6.1.1",
"source_norm": "CRA Annex I Part I (2)(c) — Schutz vor unbefugtem Zugriff",
"citation_unit": "Annex I (2)(c)", "family": "auth", "mapping_type": "supports",
"proposed_obligation_id": "user_authentication_required",
"mapping_method": "semantic",
"mapping_note": "wie V6.3.1"
},
{
"framework": "OWASP ASVS", "control": "V11.2.1",
"source_norm": "CRA Annex I Part I (2)(d) — Vertraulichkeit / Verschluesselung",
"citation_unit": "Annex I (2)(d)", "family": "crypto", "mapping_type": "supports",
"proposed_obligation_id": "credential_confidentiality_protection",
"mapping_method": "semantic",
"mapping_note": "Vertraulichkeit von Auth-Daten. ALT: encrypted_auth_channel, falls V11.2.1 transit-/kanal-spezifisch ist — bitte aus eurem Control-Text bestaetigen."
},
{
"framework": "OWASP ASVS", "control": "V11.7.1",
"source_norm": "CRA Annex I Part I (2)(d) — Vertraulichkeit / Verschluesselung",
"citation_unit": "Annex I (2)(d)", "family": "crypto", "mapping_type": "supports",
"proposed_obligation_id": "auth_key_management",
"mapping_method": "semantic",
"mapping_note": "Key Management = Schluessel erzeugen/speichern/HSM"
},
{
"framework": "OWASP ASVS", "control": "V16.3.3",
"source_norm": "CRA Annex I Part I (2)(k) — Sicherheitsrelevante Ereignisse / Logging",
"citation_unit": "Annex I (2)(k)", "family": "logging", "mapping_type": "supports",
"proposed_obligation_id": "event_logging_security_events",
"mapping_method": "semantic",
"mapping_note": "Umbrella-LM 'Produkt protokolliert sicherheitsrelevante Ereignisse' (CRA (2)(k)). ALT bei access-decision-spezifischem Control-Text: access_control_event_logging — bitte aus eurem ASVS-V16.3-Text bestaetigen."
},
{
"framework": "OWASP ASVS", "control": "V16.3.4",
"source_norm": "CRA Annex I Part I (2)(k) — Sicherheitsrelevante Ereignisse / Logging",
"citation_unit": "Annex I (2)(k)", "family": "logging", "mapping_type": "supports",
"proposed_obligation_id": "event_logging_security_events",
"mapping_method": "semantic",
"mapping_note": "Umbrella-LM (CRA (2)(k)). ALT bei admin-/privileg-spezifischem Control-Text: audit_trail_admin_actions — bitte aus eurem ASVS-V16.3-Text bestaetigen."
},
{
"framework": "OWASP ASVS", "control": "V16.1.1",
"source_norm": "CRA Annex I Part I (2)(k) — Sicherheitsrelevante Ereignisse / Logging",
"citation_unit": "Annex I (2)(k)", "family": "logging", "mapping_type": "supports",
"proposed_obligation_id": "event_logging_security_events",
"mapping_method": "semantic",
"mapping_note": "V16.1 = allgemeine Logging-Anforderung -> Umbrella-LM event_logging_security_events. Hohe Konfidenz."
}
]
}
+1582
View File
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+227
View File
@@ -0,0 +1,227 @@
{
"schema_version": "obligation_procedures_v1",
"regulation": "CRA",
"layer": "Regulation -> Legal Obligation -> Procedure -> Control -> Evidence",
"note": "Procedure ist KEINE neue Compliance-Pflicht. LEGAL_MINIMUM liegt an der Obligation; die Procedure beschreibt, WIE sie umgesetzt wird; Evidence belegt die Umsetzung. source_role=procedural_requirement (Konvergenz mit der Legal-Knowledge-Engine der anderen Session).",
"citation_status": "pending_span_anchor",
"scope": "worked examples: SBOM + Vulnerability Handling",
"procedures": [
{
"procedure_id": "sbom_generation_process",
"name": "SBOM-Erstellungsprozess",
"description": "Erzeugen einer vollstaendigen, maschinenlesbaren Software Bill of Materials fuer ein Produkt mit digitalen Elementen.",
"source_role": "procedural_requirement",
"fulfills_obligations": ["sbom_creation", "sbom_dependency_coverage", "sbom_format_standard", "sbom_tooling_automation"],
"steps": [
"Komponenten und (direkte + transitive) Abhaengigkeiten inventarisieren",
"SBOM automatisiert in der Build-/Toolchain generieren",
"Komponenten, Versionen, Lizenzen und Lieferanten erfassen",
"in anerkanntem maschinenlesbarem Format (CycloneDX/SPDX) ausgeben",
"Format- und Schemavalidierung durchfuehren"
],
"controls": [
"SBOM-Datei vorhanden",
"Format ist maschinenlesbar und standardkonform (CycloneDX/SPDX)",
"direkte und transitive Abhaengigkeiten enthalten"
],
"evidence": ["sbom.cyclonedx.json", "Format-Validierungs-Log", "Build-/Toolchain-Konfiguration"],
"citation_spans": [], "citation_status": "pending_span_anchor"
},
{
"procedure_id": "sbom_update_process",
"name": "SBOM-Aktualisierungsprozess",
"description": "Halten der SBOM aktuell ueber den Produktlebenszyklus bei Komponenten-, Versions- und Patch-Aenderungen.",
"source_role": "procedural_requirement",
"fulfills_obligations": ["sbom_maintenance_update"],
"steps": [
"Komponentenaenderung erkennen (Dependency-/Patch-/Versionsaenderung)",
"SBOM neu generieren",
"Lieferanten-SBOMs aktualisieren",
"neue SBOM-Version speichern",
"SBOM in Release-Artefakte uebernehmen"
],
"controls": [
"CI prueft SBOM vorhanden",
"SBOM-Version passt zum Release",
"Supplier-Komponenten enthalten"
],
"evidence": ["sbom.json", "CI-Log", "Release-Artefakt", "Supplier-SBOM"],
"citation_spans": [], "citation_status": "pending_span_anchor"
},
{
"procedure_id": "sbom_supplier_integration_process",
"name": "Lieferanten-SBOM-Integration",
"description": "Beschaffen und Einarbeiten von Lieferanten-/Drittkomponenten-SBOMs in die Produkt-SBOM.",
"source_role": "procedural_requirement",
"fulfills_obligations": ["sbom_supply_chain_contracts", "sbom_dependency_coverage"],
"steps": [
"SBOM-Anforderung in Lieferantenvertraege aufnehmen",
"Lieferanten-SBOMs einsammeln",
"in die Produkt-SBOM mergen",
"Drittkomponenten und deren Abhaengigkeiten nachverfolgen"
],
"controls": [
"vertragliche SBOM-Klausel vorhanden",
"Lieferanten-SBOMs eingegangen",
"Drittkomponenten in der SBOM gelistet"
],
"evidence": ["Lieferantenvertrag-Klausel", "eingegangene Supplier-SBOMs", "gemergte SBOM"],
"citation_spans": [], "citation_status": "pending_span_anchor"
},
{
"procedure_id": "sbom_provision_process",
"name": "SBOM-Bereitstellungsprozess",
"description": "Zugaenglichmachen der SBOM fuer berechtigte Parteien (Nutzer, Behoerde) unter Wahrung der Vertraulichkeit.",
"source_role": "procedural_requirement",
"fulfills_obligations": ["sbom_access_provision", "sbom_authority_provision", "sbom_confidentiality"],
"steps": [
"Zugangskanal definieren (Portal/API/dokumentierter Pfad)",
"Nutzer ueber den Zugangsweg informieren",
"auf begruendetes Verlangen der Marktueberwachungsbehoerde vertraulich bereitstellen",
"Zugriffskontrolle und Vertraulichkeitsmassnahmen anwenden"
],
"controls": [
"Zugangspfad dokumentiert",
"Zugriffskontrolle/Vertraulichkeit umgesetzt",
"Behoerden-Bereitstellungsprozess definiert"
],
"evidence": ["Zugangskanal-Dokumentation", "Behoerden-Anfrage-Log", "Zugriffskontroll-Konfiguration"],
"citation_spans": [], "citation_status": "pending_span_anchor"
},
{
"procedure_id": "sbom_conformity_documentation_process",
"name": "SBOM in technischer Dokumentation/Konformitaet",
"description": "Aufnehmen der SBOM in die technische Dokumentation und Verifizieren der Vollstaendigkeit fuer die Konformitaetsbewertung.",
"source_role": "procedural_requirement",
"fulfills_obligations": ["sbom_technical_documentation", "sbom_completeness_verification"],
"steps": [
"SBOM in die technische Dokumentation aufnehmen",
"Vollstaendigkeit gegen die real eingesetzte Softwarekomposition pruefen",
"der Konformitaetsbewertung beilegen (ggf. EUCC)"
],
"controls": [
"SBOM Teil der technischen Dokumentation",
"Vollstaendigkeit verifiziert",
"Konformitaetsnachweis vorhanden"
],
"evidence": ["technische Dokumentation", "Vollstaendigkeits-Pruefbericht", "Konformitaetsnachweis"],
"citation_spans": [], "citation_status": "pending_span_anchor"
},
{
"procedure_id": "vuln_handling_process_setup",
"name": "Schwachstellenbehandlungsprozess einrichten",
"description": "Dokumentierten Prozess und Meldekanal (CVD) fuer die Schwachstellenbehandlung etablieren.",
"source_role": "procedural_requirement",
"fulfills_obligations": ["vuln_handling_process"],
"steps": [
"dokumentierten Schwachstellenbehandlungsprozess definieren",
"Coordinated-Vulnerability-Disclosure-Richtlinie und Meldekanal veroeffentlichen",
"eingehende Meldungen triagieren"
],
"controls": [
"Behandlungsprozess dokumentiert",
"Meldekanal/Kontaktstelle auffindbar (z.B. security.txt)",
"Triage-Verfahren vorhanden"
],
"evidence": ["Prozessdokument", "security.txt / Kontaktstelle", "Triage-Log"],
"citation_spans": [], "citation_status": "pending_span_anchor"
},
{
"procedure_id": "vuln_identification_process",
"name": "Schwachstellen-Identifikation",
"description": "Bekannte Schwachstellen in eingesetzten Komponenten erkennen und inventarisieren.",
"source_role": "procedural_requirement",
"fulfills_obligations": ["vuln_identification_inventory"],
"steps": [
"Advisories/CVE-Feeds beobachten",
"gegen die SBOM-Komponenten abgleichen",
"Schwachstellen-Inventar pflegen"
],
"controls": [
"Advisory-/CVE-Monitoring aktiv",
"SBOM-zu-CVE-Abgleich durchgefuehrt",
"Schwachstellen-Inventar gepflegt"
],
"evidence": ["CVE-Abgleich-Report", "Schwachstellen-Register"],
"citation_spans": [], "citation_status": "pending_span_anchor"
},
{
"procedure_id": "vuln_assessment_process",
"name": "Schwachstellen-Bewertung/Priorisierung",
"description": "Identifizierte Schwachstellen nach Schweregrad, Ausnutzbarkeit und Exposition bewerten und priorisieren.",
"source_role": "procedural_requirement",
"fulfills_obligations": ["vuln_assessment_prioritization"],
"steps": [
"Schweregrad bewerten (z.B. CVSS)",
"Ausnutzbarkeit/Exposition einschaetzen",
"risikobasiert priorisieren"
],
"controls": [
"Schweregrad standardisiert bewertet",
"risikobasierte Priorisierung vorhanden"
],
"evidence": ["Bewertungsdatensatz (CVSS)", "Prioritaetenliste"],
"citation_spans": [], "citation_status": "pending_span_anchor"
},
{
"procedure_id": "vuln_remediation_process",
"name": "Schwachstellen-Behebung",
"description": "Bekannte Schwachstellen fristgerecht durch Patches/Gegenmassnahmen beheben und Sicherheitsupdates bereitstellen.",
"source_role": "procedural_requirement",
"fulfills_obligations": ["vuln_remediation_patching"],
"steps": [
"Fix/Gegenmassnahme entwickeln",
"testen",
"Sicherheitsupdate kostenfrei und zeitnah bereitstellen",
"bis zum Abschluss nachverfolgen"
],
"controls": [
"zeitnahe Behebung",
"Sicherheitsupdate bereitgestellt",
"Follow-up bis Closure"
],
"evidence": ["Patch/Release", "Behebungs-Zeitleiste", "Follow-up-Log"],
"citation_spans": [], "citation_status": "pending_span_anchor"
},
{
"procedure_id": "vuln_disclosure_process",
"name": "Offenlegung + Nutzerinformation",
"description": "Koordinierte Offenlegung behobener Schwachstellen und Information der Nutzer ueber Schutzmassnahmen.",
"source_role": "procedural_requirement",
"fulfills_obligations": ["coordinated_vulnerability_disclosure", "vuln_info_dissemination_users"],
"steps": [
"Offenlegungszeitpunkt koordinieren",
"Security Advisory / CVE-Eintrag veroeffentlichen",
"Nutzer ueber behobene Schwachstelle und Schutzmassnahmen informieren"
],
"controls": [
"Advisory veroeffentlicht",
"Nutzer informiert"
],
"evidence": ["Security Advisory", "CVE-Eintrag", "Nutzer-Benachrichtigung"],
"citation_spans": [], "citation_status": "pending_span_anchor"
},
{
"procedure_id": "vuln_authority_reporting_process",
"name": "Behoerdenmeldung aktiv ausgenutzter Schwachstellen",
"description": "Aktiv ausgenutzte Schwachstellen fristgerecht an CSIRT/ENISA melden (CRA Art. 14-Kaskade).",
"source_role": "procedural_requirement",
"fulfills_obligations": ["exploited_vuln_reporting_authorities"],
"applicability_note": "bedingt: nur bei aktiv ausgenutzter Schwachstelle",
"steps": [
"aktive Ausnutzung erkennen",
"Fruehwarnung an CSIRT/ENISA (24h)",
"vollstaendige Meldung (72h)",
"Abschlussbericht (14 Tage)"
],
"controls": [
"24h-Fruehwarnung erfolgt",
"72h-Meldung erfolgt",
"14d-Abschlussbericht erfolgt"
],
"evidence": ["CSIRT/ENISA-Meldungsbelege", "Zeitstempel der Kaskade"],
"citation_spans": [], "citation_status": "pending_span_anchor"
}
]
}
+587
View File
@@ -0,0 +1,587 @@
{
"schema_version": "obligation_join_keys_v1",
"contract": "obligation_id ist der stabile Join-Key. Legal Knowledge Graph haengt citation_spans an obligation_id; Compliance Execution Graph mappt control_mapping.source_norm -> obligation_id. Interim-Bruecke = citation_units. obligation_id NIE neu vergeben (re-link).",
"count": 66,
"obligation_ids": [
{
"obligation_id": "sbom_creation",
"regulation": "CRA",
"family": "sbom",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Annex I Part II (1)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "sbom_dependency_coverage",
"regulation": "CRA",
"family": "sbom",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Art. 3(36) i.V.m. Annex I Part II (1)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "sbom_format_standard",
"regulation": "CRA",
"family": "sbom",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Annex I Part II (1)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "sbom_maintenance_update",
"regulation": "CRA",
"family": "sbom",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Annex I Part II (1)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "sbom_completeness_verification",
"regulation": "CRA",
"family": "sbom",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "sbom_tooling_automation",
"regulation": "CRA",
"family": "sbom",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "IMPLEMENTATION"
},
{
"obligation_id": "sbom_access_provision",
"regulation": "CRA",
"family": "sbom",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "sbom_authority_provision",
"regulation": "CRA",
"family": "sbom",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Art. 31 / Annex I Part II (1)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "sbom_confidentiality",
"regulation": "CRA",
"family": "sbom",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Art. 31(4)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "sbom_supply_chain_contracts",
"regulation": "CRA",
"family": "sbom",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "sbom_technical_documentation",
"regulation": "CRA",
"family": "sbom",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Art. 31 i.V.m. Annex VII"
],
"source_role": "EVIDENCE"
},
{
"obligation_id": "vuln_identification_inventory",
"regulation": "CRA",
"family": "vuln",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Annex I Part II (1)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "vuln_assessment_prioritization",
"regulation": "CRA",
"family": "vuln",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Annex I Part II (1)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "vuln_remediation_patching",
"regulation": "CRA",
"family": "vuln",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Annex I Part II (2) & (8)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "vuln_handling_process",
"regulation": "CRA",
"family": "vuln",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Article 13(8) & Annex VII"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "coordinated_vulnerability_disclosure",
"regulation": "CRA",
"family": "vuln",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Annex I Part II (5)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "exploited_vuln_reporting_authorities",
"regulation": "CRA",
"family": "vuln",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Article 14 & Article 16"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "vuln_info_dissemination_users",
"regulation": "CRA",
"family": "vuln",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Annex I Part II (4) & (6)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "user_authentication_required",
"regulation": "CRA",
"family": "authentication",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Annex I (2)(d)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "authentication_policy_documented",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "auth_exceptions_documented",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "mfa_required",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "step_up_authentication",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "privileged_op_reauth",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "strong_crypto_authentication",
"regulation": "CRA",
"family": "authentication",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Annex I (2)(e)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "credential_lifecycle_management",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "credential_confidentiality_protection",
"regulation": "CRA",
"family": "authentication",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Annex I (2)(e)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "password_policy",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "no_default_credentials",
"regulation": "CRA",
"family": "authentication",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Annex I (2)(a)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "account_lockout_failed_attempts",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "server_side_validation",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "session_binding_management",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "reauth_after_inactivity",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "token_validation_lifecycle",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "mutual_authentication",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "revocation_check",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "encrypted_auth_channel",
"regulation": "CRA",
"family": "authentication",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Annex I (2)(e)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "tls_certificate_auth",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "service_to_service_auth",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "auth_key_management",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "biometric_authentication",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "federated_auth_assertions",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "separate_authn_authz",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "remote_access_authentication",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "supplier_access_auth",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "personal_admin_accounts",
"regulation": "CRA",
"family": "authentication",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "firmware_software_authentication",
"regulation": "CRA",
"family": "authentication",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Annex I (2)(c)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "event_logging_security_events",
"regulation": "CRA",
"family": "logging",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Annex I Part I (2)(k)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "access_control_event_logging",
"regulation": "CRA",
"family": "logging",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Annex I Part I (2)(k)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "audit_trail_admin_actions",
"regulation": "CRA",
"family": "logging",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Annex I Part I (2)(k)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "log_integrity_immutability",
"regulation": "CRA",
"family": "logging",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Annex I Part I (2)(k)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "log_access_control_protection",
"regulation": "CRA",
"family": "logging",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Annex I Part I (2)(k)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "log_retention_archival",
"regulation": "CRA",
"family": "logging",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "centralized_log_management",
"regulation": "CRA",
"family": "logging",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "log_monitoring_alerting",
"regulation": "CRA",
"family": "logging",
"tier": "LEGAL_MINIMUM",
"citation_units": [
"Annex I Part I (2)(k)"
],
"source_role": "LEGAL_BASIS"
},
{
"obligation_id": "log_data_minimization_privacy",
"regulation": "CRA",
"family": "logging",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "log_format_standardization",
"regulation": "CRA",
"family": "logging",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "log_timestamp_synchronization",
"regulation": "CRA",
"family": "logging",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "logging_availability_resilience",
"regulation": "CRA",
"family": "logging",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "logging_thread_safety_correctness",
"regulation": "CRA",
"family": "logging",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "IMPLEMENTATION"
},
{
"obligation_id": "logging_library_supply_chain",
"regulation": "CRA",
"family": "logging",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "logging_config_management",
"regulation": "CRA",
"family": "logging",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "logging_governance_roles",
"regulation": "CRA",
"family": "logging",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "incident_response_logging",
"regulation": "CRA",
"family": "logging",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "log_transmission_security",
"regulation": "CRA",
"family": "logging",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
},
{
"obligation_id": "network_traffic_logging",
"regulation": "CRA",
"family": "logging",
"tier": "BEST_PRACTICE",
"citation_units": [],
"source_role": "GUIDANCE"
}
]
}
+114
View File
@@ -0,0 +1,114 @@
"""Reine Helfer der Obligation Discovery Pipeline (keine schweren Imports → unit-testbar).
Die Pipeline leitet aus großen Compliance-Korpora eine regulatorische Ontologie ab:
Controls → Mikro-Cluster → Meta-Cluster/Review-Units → LLM-Synthese → Obligation Registry.
Architekturregel: RUNTIME bleibt deterministisch; DISCOVERY (dieses Tooling) darf LLM-gestützt
sein und läuft EINMALIG/offline. Siehe docs-src/development/obligation_discovery_pipeline_v1.md.
"""
from __future__ import annotations
import ast
import json
import math
from typing import Optional
SEMANTIC_EDGE_TYPES = ("depends_on", "supports", "produces_evidence_for",
"implements", "derived_from")
def parse_req(req) -> list:
"""requirements-Spalte (JSON ODER Python-Repr ODER String) robust zu Liste."""
if isinstance(req, list):
return req
if isinstance(req, str):
for fn in (json.loads, ast.literal_eval):
try:
v = fn(req)
return v if isinstance(v, list) else [str(v)]
except Exception:
pass
return [req]
return []
def cosine(a, b) -> float:
if not a or not b:
return 0.0
dot = sum(x * y for x, y in zip(a, b))
na = math.sqrt(sum(x * x for x in a))
nb = math.sqrt(sum(y * y for y in b))
return dot / (na * nb) if na and nb else 0.0
def greedy_cluster(vecs: list, thr: float) -> list[dict]:
"""Single-Pass-Greedy-Clustering: jeder Vektor joint den ersten Cluster, dessen Seed
cosine ≥ thr ist, sonst neuer Cluster. Deterministisch (stabile Reihenfolge)."""
clusters: list[dict] = []
for i, v in enumerate(vecs):
if not v:
clusters.append({"seed": None, "members": [i]})
continue
best, best_sim = None, thr
for c in clusters:
if c["seed"] is None:
continue
s = cosine(v, c["seed"])
if s >= best_sim:
best_sim, best = s, c
if best:
best["members"].append(i)
else:
clusters.append({"seed": v, "members": [i]})
return clusters
def centroid(idxs: list[int], vecs: list) -> Optional[list]:
vs = [vecs[i] for i in idxs if vecs[i]]
if not vs:
return None
n = len(vs)
return [sum(col) / n for col in zip(*vs)]
def validate_registry(reg: dict) -> dict:
"""Belastbarkeits-Checks (User-Regeln): LEGAL_MINIMUM braucht legal_basis,
member_controls vollständig, out_of_scope separat, >8-Obligations/Review-Unit-Warnung."""
obls = reg.get("obligations", [])
lm = [o for o in obls if o.get("tier") == "LEGAL_MINIMUM"]
lm_without_basis = [o["id"] for o in lm if not o.get("legal_basis")]
empty_members = [o["id"] for o in obls if not o.get("member_controls")]
per_unit: dict[str, int] = {}
for o in obls:
ru = (o.get("provenance") or {}).get("source_meta_cluster")
if ru:
per_unit[ru] = per_unit.get(ru, 0) + 1
over8 = {ru: n for ru, n in per_unit.items() if n > 8}
rels = reg.get("relationships", [])
return {
"obligations": len(obls),
"legal_minimum": len(lm),
"lm_without_legal_basis": lm_without_basis,
"empty_member_controls": empty_members,
"over8_per_review_unit": over8,
"out_of_scope": sum(1 for r in rels if r.get("type") == "out_of_scope"),
"semantic_edges": sum(1 for r in rels if r.get("type") in SEMANTIC_EDGE_TYPES),
"passed": not lm_without_basis and not empty_members and not over8,
}
def merge_edges(relationships: list[dict], proposed: list[dict]) -> tuple[list[dict], int]:
"""Proposed semantische Kanten dedupliziert in relationships mergen. Gibt (merged, added)."""
existing = {(r.get("type"), r.get("from"), r.get("to"))
for r in relationships if r.get("from")}
added = 0
out = list(relationships)
for e in proposed:
if e.get("type") not in SEMANTIC_EDGE_TYPES:
continue
key = (e["type"], e.get("from"), e.get("to"))
if key in existing or not e.get("from") or not e.get("to"):
continue
out.append(e)
existing.add(key)
added += 1
return out, added
@@ -0,0 +1,90 @@
"""P3 — Compliance-Advisor-Proof: obligation-basierte Antwort als vollstaendige
BEGRUENDUNGSKETTE aus der Registry (NICHT RAG-Text, KEIN LLM):
Rechtsgrundlage -> Obligation -> Procedure -> Controls -> Evidence -> Antwort.
Deterministisch + zitierfaehig. Der Unterschied zu RAG: RAG beantwortet — BreakPilot
begruendet UND operationalisiert.
python3 scripts/obligation_discovery/advisor_proof.py --registry obligations/cra.json \
--procedures obligations/cra_procedures.json --topic sbom --has-digital-elements
"""
from __future__ import annotations
import argparse
import json
def applies(obl: dict, has_digital: bool) -> tuple[bool, str]:
a = obl.get("applicability", "universal")
if a == "universal":
return True, ""
if a.startswith("domain:products_with_digital_elements"):
return has_digital, "nur fuer Produkte mit digitalen Elementen (CRA Art. 3)"
if a.startswith("domain:"):
return True, a.split(":", 1)[1]
if a.startswith("conditional:"):
return True, f"bedingt: {a.split(':',1)[1]}"
return True, ""
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("--registry", required=True)
ap.add_argument("--procedures", required=True)
ap.add_argument("--topic", default="sbom")
ap.add_argument("--has-digital-elements", action="store_true")
ap.add_argument("--question", default="Muss ich als Maschinenbauer eine SBOM bereitstellen?")
a = ap.parse_args()
reg = json.load(open(a.registry, encoding="utf-8"))
procs = json.load(open(a.procedures, encoding="utf-8"))["procedures"]
obls = [o for o in reg["obligations"]
if a.topic in o.get("family", "") or a.topic in o["id"]]
ids = {o["id"] for o in obls}
by_obl: dict[str, list] = {}
for p in procs:
for oid in p.get("fulfills_obligations", []):
by_obl.setdefault(oid, []).append(p)
pflicht = [o for o in obls if o["tier"] == "LEGAL_MINIMUM" and applies(o, a.has_digital_elements)[0]]
best = [o for o in obls if o["tier"] != "LEGAL_MINIMUM"]
print(f"FRAGE: {a.question}")
print(f"\nANTWORT: {'JA' if pflicht and a.has_digital_elements else 'NUR WENN CRA-anwendbar'}"
f"sofern das Produkt unter den CRA faellt (product with digital elements, Art. 3).")
print("\n══ BEGRUENDUNGSKETTE (Recht → Obligation → Procedure → Controls → Evidence) ══")
req_evidence: list[str] = []
for o in pflicht:
lb = "; ".join(f"{b.get('source','')} {b.get('anchor','')}".strip() for b in o.get("legal_basis", []))
print(f"\n● PFLICHT: {o['id']}{o.get('description','')[:80]}")
print(f" Rechtsgrundlage: {lb or ''}")
ps = by_obl.get(o["id"], [])
for p in ps:
print(f" Procedure (wie umgesetzt): {p['procedure_id']} — Schritte: {len(p.get('steps',[]))}")
print(f" Controls (Pruefung): {' · '.join(p.get('controls', []))[:96]}")
print(f" Nachweis: {' · '.join(p.get('evidence', []))}")
req_evidence += p.get("evidence", [])
if not ps:
print(" Procedure: (noch keine modelliert)")
print("\n── REQUIRED EVIDENCE (aggregiert, womit wird es nachgewiesen) ──")
print(" " + " · ".join(dict.fromkeys(req_evidence)) if req_evidence else "")
print("\n── BEST PRACTICE (anerkannte Umsetzung, KEINE CRA-Wortlautpflicht) ──")
for o in best:
gb = "; ".join(b.get("source", "") for b in o.get("guidance_basis", []))
print(f"{o['id']}{o.get('description','')[:64]} | Guidance: {gb or ''}")
print("\n── BEZIEHUNG (warum es zaehlt) ──")
for r in reg.get("relationships", []):
if r.get("from") in ids and r.get("to") not in ids:
print(f"{r['from']} --{r['type']}--> {r['to']}: {r.get('note','')[:64]}")
pend = sum(1 for o in pflicht if o.get("citation_status") == "pending_span_anchor")
print(f"\n── CITATION ──\n {pend}/{len(pflicht)} Pflichten: pending_span_anchor "
f"(Textstellen-Anker folgen mit dem zitierfaehigen Re-Ingest)")
print("\n(RAG beantwortet — BreakPilot begruendet UND operationalisiert.)")
if __name__ == "__main__":
main()
@@ -0,0 +1,52 @@
"""Exportiert den OBLIGATION_ID-Join-Key-Vertrag aus den Registry-Artefakten.
Die obligation_id ist der stabile Brueckenschluessel zwischen Legal Knowledge Graph
(citation_spans haengen an obligation_id) und Compliance Execution Graph
(control_mapping.source_norm -> obligation_id). citation_units = die legal_basis-Anker,
ueber die beide Seiten heute (vor obligation_id-Adoption) bruecken koennen.
DISZIPLIN: obligation_id wird RE-GELINKT, NIE neu vergeben (Pendant zu span_id/control_uuid).
python3 scripts/obligation_discovery/export_join_keys.py obligations/cra.json obligations/cra_authentication.json
"""
from __future__ import annotations
import argparse
import json
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("registries", nargs="+")
ap.add_argument("--out", default="obligations/obligation_join_keys.json")
a = ap.parse_args()
keys = []
for path in a.registries:
reg = json.load(open(path, encoding="utf-8"))
for o in reg.get("obligations", []):
citation_units = [b.get("anchor", "") for b in o.get("legal_basis", []) if b.get("anchor")]
keys.append({
"obligation_id": o["id"],
"regulation": reg.get("regulation", ""),
"family": o.get("family", ""),
"tier": o.get("tier", ""),
"citation_units": citation_units,
"source_role": o.get("source_role", ""),
})
out = {
"schema_version": "obligation_join_keys_v1",
"contract": "obligation_id ist der stabile Join-Key. Legal Knowledge Graph haengt "
"citation_spans an obligation_id; Compliance Execution Graph mappt "
"control_mapping.source_norm -> obligation_id. Interim-Bruecke = citation_units. "
"obligation_id NIE neu vergeben (re-link).",
"count": len(keys),
"obligation_ids": keys,
}
json.dump(out, open(a.out, "w", encoding="utf-8"), ensure_ascii=False, indent=1)
from collections import Counter
print(f"exportiert: {a.out} ({len(keys)} obligation_ids)")
print("Regulierungen:", dict(Counter(k["regulation"] for k in keys)))
print("Familien:", dict(Counter(k["family"] for k in keys)))
if __name__ == "__main__":
main()
@@ -0,0 +1,36 @@
"""Stufe 5 — Review-Diff mergen: vorgeschlagene Beziehungskanten (review_status=proposed)
dedupliziert in die Registry mergen (kein LLM/Key). Kleine Beziehungs-Sprache:
depends_on/supports/produces_evidence_for/implements/derived_from.
python3 scripts/obligation_discovery/merge_review_diff.py obligations/cra.json /tmp/cra_edges_review.json
"""
from __future__ import annotations
import argparse
import json
from _core import SEMANTIC_EDGE_TYPES, merge_edges
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("registry")
ap.add_argument("review_diff")
ap.add_argument("--write", action="store_true", help="in die Registry schreiben (sonst dry-run)")
a = ap.parse_args()
reg = json.load(open(a.registry, encoding="utf-8"))
diff = json.load(open(a.review_diff, encoding="utf-8"))
proposed = diff.get("proposed_edges", diff if isinstance(diff, list) else [])
merged, added = merge_edges(reg.get("relationships", []), proposed)
print(f"proposed: {len(proposed)} | added (dedupliziert): {added}")
if a.write:
reg["relationships"] = merged
reg["relationship_types"] = list(SEMANTIC_EDGE_TYPES)
json.dump(reg, open(a.registry, "w", encoding="utf-8"), ensure_ascii=False, indent=1)
print(f"written: {a.registry}")
else:
print("dry-run (use --write to apply)")
if __name__ == "__main__":
main()
@@ -0,0 +1,55 @@
"""Stufe 2 — Meta-Cluster (der Skalierungs-Fix für große Domänen): Mikro-Cluster →
REVIEW UNITS. Review Unit = das, was der LLM-Synthese-Pass sieht (entkoppelt vom Clustering,
später merge/split-bar). Nutzt den Embedding-Cache aus precluster (kein Re-Embed).
python3 scripts/obligation_discovery/meta_cluster.py --scope auth --meta-thr 0.62
"""
from __future__ import annotations
import argparse
import json
import os
import pickle
from _core import centroid, greedy_cluster
def run(scope: str, meta_thr: float, outdir: str) -> None:
micro = json.load(open(os.path.join(outdir, f"{scope}_micro_clusters.json"), encoding="utf-8"))
vecs = pickle.load(open(os.path.join(outdir, f"{scope}_vecs.pkl"), "rb"))
centroids = [centroid(m["member_indices"], vecs) for m in micro]
meta = greedy_cluster(centroids, meta_thr)
print(f"scope={scope} pass-2 (meta-thr={meta_thr}): {len(micro)} micro → {len(meta)} review-units")
out = []
for mi, m in enumerate(meta):
ctrl_ids, titles = [], []
for micro_idx in m["members"]:
mc = micro[micro_idx]
ctrl_ids += mc["control_ids"]
titles.append(mc["titles"][0] if mc["titles"] else "")
out.append({"review_unit_id": f"M{mi}", "n_micro": len(m["members"]),
"n_controls": len(ctrl_ids), "control_ids": ctrl_ids,
"sample_titles": titles[:8]})
out.sort(key=lambda x: -x["n_controls"])
path = os.path.join(outdir, f"{scope}_review_units.json")
json.dump(out, open(path, "w", encoding="utf-8"), ensure_ascii=False, indent=1)
print("=== top review units (inspect for cross-domain mixing BEFORE synthesis) ===")
for m in out[:12]:
print(f" {m['review_unit_id']:5} ctrl={m['n_controls']:4} micro={m['n_micro']:3} "
f"| {' || '.join(t[:30] for t in m['sample_titles'][:3])}")
print(f"written: {path} ({len(out)} review units)")
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("--scope", default="auth")
ap.add_argument("--meta-thr", type=float, default=0.62)
ap.add_argument("--outdir", default="/tmp")
a = ap.parse_args()
run(a.scope, a.meta_thr, a.outdir)
if __name__ == "__main__":
main()
@@ -0,0 +1,76 @@
"""Stufe 1 — Pre-Cluster: Controls (scope) → BGE-M3-Embedding (gecacht) → Mikro-Cluster.
Deterministisch. Im bp-compliance-backend-Container ausführen (PYTHONPATH=/app).
python3 scripts/obligation_discovery/precluster.py --scope sbom
python3 scripts/obligation_discovery/precluster.py --patterns '%sbom%,%software bill%' --micro-thr 0.78
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import pickle
from _core import greedy_cluster, parse_req
SCOPES = {
"sbom": ["%SBOM%", "%software bill%", "%stückliste%", "%komponentenliste%"],
"vuln": ["%schwachstellenbehandl%", "%schwachstellenmanagement%", "%vulnerability handling%",
"%coordinated vulnerab%", "%vulnerability disclosure%", "%cvd-konzept%"],
"auth": ["%authentisierung%", "%authentifizierung%", "%authentication%"],
"logging": ["%logging%", "%protokollierung%", "%audit-log%", "%audit-trail%",
"%ereignisprotokoll%", "%sicherheitsprotokoll%", "%audit-protokoll%",
"%log-management%", "%sicherheitsereignis%protokoll%", "%audit-trail%"],
}
async def run(scope: str, patterns: list[str], micro_thr: float, outdir: str) -> None:
import asyncpg
from compliance.services.mc_embedding_matcher import _embed_texts
dsn = os.getenv("DATABASE_URL") or os.getenv("COMPLIANCE_DATABASE_URL")
conn = await asyncpg.connect(dsn)
where = " or ".join(f"title ilike ${i+1}" for i in range(len(patterns)))
rows = await conn.fetch(
f"select control_id, title, requirements from compliance.canonical_controls "
f"where {where} order by control_id", *patterns)
await conn.close()
items = [{"control_id": r["control_id"], "title": r["title"] or "",
"embed_text": (r["title"] or "") + ". " + " ".join(parse_req(r["requirements"])[:2])}
for r in rows]
print(f"scope={scope}: {len(items)} controls")
cache = os.path.join(outdir, f"{scope}_vecs.pkl")
if os.path.exists(cache):
vecs = pickle.load(open(cache, "rb"))
print(f"embeddings from cache ({len(vecs)})")
else:
vecs = await _embed_texts([it["embed_text"] for it in items])
pickle.dump(vecs, open(cache, "wb"))
print(f"embeddings fresh+cached ({len(vecs)})")
micro = greedy_cluster(vecs, micro_thr)
print(f"pass-1 (micro-thr={micro_thr}): {len(items)}{len(micro)} micro-clusters")
out = [{"micro_id": i, "size": len(c["members"]), "member_indices": c["members"],
"control_ids": [items[j]["control_id"] for j in c["members"]],
"titles": [items[j]["title"] for j in c["members"][:6]]}
for i, c in enumerate(micro)]
path = os.path.join(outdir, f"{scope}_micro_clusters.json")
json.dump(out, open(path, "w", encoding="utf-8"), ensure_ascii=False, indent=1)
print(f"written: {path}")
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("--scope", default="sbom")
ap.add_argument("--patterns", default="", help="comma-separated SQL ILIKE patterns (overrides --scope)")
ap.add_argument("--micro-thr", type=float, default=0.78)
ap.add_argument("--outdir", default="/tmp")
a = ap.parse_args()
patterns = [p for p in a.patterns.split(",") if p] or SCOPES[a.scope]
asyncio.run(run(a.scope, patterns, a.micro_thr, a.outdir))
if __name__ == "__main__":
main()
@@ -0,0 +1,113 @@
"""Stufe 3 — LLM-Synthese: REVIEW UNITS → Obligation Registry (Schema obligation_registry_v1).
Geschärfter Prompt: kleinste Menge regulatorisch UNTERSCHIEDLICHER Obligations. Harte Tier-
Regel in Code erzwungen. Provenance pro Obligation. ANTHROPIC_API_KEY aus ENV (nie hartcodiert).
Große Calls → STREAMING (SDK blockt non-streaming >10min).
ANTHROPIC_API_KEY=… python3 scripts/obligation_discovery/synthesize_obligations.py \
--units /tmp/auth_review_units.json --regulation CRA --theme "Authentisierung" --out /tmp/auth_registry.json
"""
from __future__ import annotations
import argparse
import json
import os
import re
from collections import Counter
from _core import SEMANTIC_EDGE_TYPES
SYS = """Du bist Knowledge Engineer und baust eine LEGAL OBLIGATION REGISTRY fuer __REGULATION__
(Thema: __THEME__). Input: REVIEW UNITS (algorithmisch vor-gebuendelte Control-Gruppen), jede
kann MEHRERE unterschiedliche Pflichten enthalten.
AUFGABE: Zerlege die Review Units in die KLEINSTE MENGE regulatorisch UNTERSCHIEDLICHER Legal
Obligations. Regeln:
- Nichts zusammenfuehren nur wegen aehnlicher Woerter.
- Unterschiedliche Rechtsgrundlage => unterschiedliche Obligation.
- Unterschiedliche Applicability => unterschiedliche Obligation.
- Unterschiedliche Evidence-Facette (governance/capability/evidence) => GLEICHE Obligation, andere Facette.
- Unterschiedliche Umsetzung (NIST/OWASP/ISO/BSI) => guidance_basis, KEINE neue Obligation.
- Gleiche Pflicht ueber mehrere Review Units => EINE Obligation (mehrere member_review_units).
Gib AUSSCHLIESSLICH JSON aus:
{"obligations":[{"id":"snake_case","name":"","description":"","tier":"LEGAL_MINIMUM|BEST_PRACTICE|IMPLEMENTATION_GUIDANCE|EVIDENCE","applicability":"universal|conditional:<pred>|domain:<x>","evidence_facets":{"governance":true,"capability":true,"evidence":false},"source_role":"LEGAL_BASIS|GUIDANCE|EVIDENCE|IMPLEMENTATION","legal_basis":[{"source":"__REGULATION__","anchor":"","citation":""}],"guidance_basis":[{"source":"NIST|OWASP|ISO|BSI","anchor":"","role":"best_practice"}],"subdomain":"","member_review_units":["M0"],"source_meta_cluster":"M0","discovery_confidence":0.9}],
"relationships":[{"type":"depends_on|supports|produces_evidence_for|implements|derived_from","from":"id","to":"id","note":""},{"type":"out_of_scope","review_units":["M0"],"note":""}]}
HARTE REGELN:
- tier=LEGAL_MINIMUM NUR mit legal_basis (Primaerrecht). Sonst tier=BEST_PRACTICE, legal_basis=[].
- legal_basis NUR Primaerrecht der Regulierung; NIST/OWASP/ISO/BSI => guidance_basis.
- relationships SPARSAM, gerichtet, nur klar belegbar.
- out_of_scope: Review Units, die NICHT zum Thema gehoeren (andere Regulierung/Domaene)."""
def build_user(units: list[dict]) -> str:
lines = []
for u in units:
t = " | ".join(str(x)[:46] for x in u.get("sample_titles", [])[:6])
lines.append(f"{u['review_unit_id']} (controls={u['n_controls']}): {t}")
return "Review Units:\n" + "\n".join(lines)
def synthesize(units, regulation, theme, model):
import anthropic
key = os.environ["ANTHROPIC_API_KEY"]
sys = SYS.replace("__REGULATION__", regulation).replace("__THEME__", theme)
client = anthropic.Anthropic(api_key=key)
with client.messages.stream(model=model, max_tokens=24000, system=sys,
messages=[{"role": "user", "content": build_user(units)}]) as st:
msg = st.get_final_message()
txt = msg.content[0].text
m = re.search(r"\{.*\}", txt, re.DOTALL)
return json.loads(m.group(0) if m else txt)
def post_process(data, units, regulation, model):
cmap = {u["review_unit_id"]: u["control_ids"] for u in units}
size = {u["review_unit_id"]: u["n_controls"] for u in units}
obls = []
for o in data.get("obligations", []):
rus = [r for r in (o.get("member_review_units") or []) if r in cmap]
members = sorted({c for ru in rus for c in cmap[ru]})
lb = o.get("legal_basis") or []
tier, review = o.get("tier", "BEST_PRACTICE"), "draft"
if tier == "LEGAL_MINIMUM" and not lb:
tier, review = "BEST_PRACTICE", "needs_legal_basis"
smc = o.get("source_meta_cluster") or (rus[0] if rus else "")
obls.append({
"id": o["id"], "name": o.get("name", ""), "description": o.get("description", ""),
"tier": tier, "subdomain": o.get("subdomain", ""),
"applicability": o.get("applicability", "universal"),
"evidence_facets": o.get("evidence_facets", {}), "source_role": o.get("source_role", ""),
"legal_basis": lb, "guidance_basis": o.get("guidance_basis") or [],
"member_review_units": rus, "member_controls": members, "member_count": len(members),
"relationships": [], "citation_anchor_ids": [], "citation_status": "pending_span_anchor",
"review_status": review,
"provenance": {"discovery_confidence": o.get("discovery_confidence"),
"source_meta_cluster": smc, "cluster_size": size.get(smc),
"llm_model": model, "synthesis_version": "v1"}})
rels = [r for r in data.get("relationships", [])
if r.get("type") in SEMANTIC_EDGE_TYPES or r.get("type") == "out_of_scope"]
return {"schema_version": "obligation_registry_v1", "regulation": regulation,
"generated_by": f"obligation_discovery/{model}", "synthesis_version": "v1",
"citation_status": "pending_span_anchor", "obligations": obls, "relationships": rels}
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("--units", required=True)
ap.add_argument("--regulation", default="CRA")
ap.add_argument("--theme", default="")
ap.add_argument("--model", default="claude-opus-4-8")
ap.add_argument("--out", required=True)
a = ap.parse_args()
units = json.load(open(a.units, encoding="utf-8"))
data = synthesize(units, a.regulation, a.theme, a.model)
reg = post_process(data, units, a.regulation, a.model)
json.dump(reg, open(a.out, "w", encoding="utf-8"), ensure_ascii=False, indent=1)
o = reg["obligations"]
print(f"obligations: {len(o)} | tier: {dict(Counter(x['tier'] for x in o))}")
print(f"written: {a.out}")
if __name__ == "__main__":
main()
@@ -0,0 +1,35 @@
"""Stufe 4 — Validierung: belastbare Registry-Checks (kein LLM/Key).
Prüft die User-Regeln: LEGAL_MINIMUM braucht legal_basis · member_controls vollständig ·
out_of_scope separat · >8-Obligations/Review-Unit-Warnung. Exit-Code 1 bei hartem Fehler.
python3 scripts/obligation_discovery/validate_registry.py obligations/cra_authentication.json
"""
from __future__ import annotations
import argparse
import json
import sys
from _core import validate_registry
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("registry")
a = ap.parse_args()
reg = json.load(open(a.registry, encoding="utf-8"))
v = validate_registry(reg)
print(f"=== validate {a.registry} ===")
print(f" obligations: {v['obligations']}")
print(f" LEGAL_MINIMUM: {v['legal_minimum']}")
print(f" LM ohne legal_basis: {v['lm_without_legal_basis'] or 'keine'}")
print(f" member_controls leer: {v['empty_member_controls'] or 'keine'}")
print(f" >8 Obligations/Review-Unit: {v['over8_per_review_unit'] or 'keine'}")
print(f" out_of_scope: {v['out_of_scope']}")
print(f" semantische Kanten: {v['semantic_edges']}")
print(f" PASSED: {v['passed']}")
sys.exit(0 if v["passed"] else 1)
if __name__ == "__main__":
main()