feat(ucca): Advisor obligation-status Durchstich (step 3 complete)

AssessObligationStatus traverses obligation_id -> (citation_unit) -> accepted
controls -> required evidence -> status (erfuellt|offen|unklar). Evidence
presence is a callback; MVP passes nil (nothing collected yet) -> offen.
citation_spans = "pending" until the Legal-Knowledge-Graph session attaches
them. This is the vertical slice that makes the graph a product feature:
"CRA obligation fulfilled because evidence X/Y/Z is present", not "a doc exists".

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-25 11:15:57 +02:00
parent 86d1473a6a
commit 417bcda68c
3 changed files with 163 additions and 0 deletions
@@ -0,0 +1,71 @@
package ucca
// ObligationStatus is the Advisor's vertical slice over the compliance graph for ONE legal
// obligation: which accepted controls satisfy it, what evidence they require, what's missing,
// and the resulting status. The point is "the required evidence is (not) present", not "a
// document exists". citation_spans is pending until the Legal-Knowledge-Graph session attaches
// them to the obligation (the upper half of the bridge).
type ObligationStatus struct {
ObligationID string `json:"obligation_id"`
LegalBasis []string `json:"legal_basis"` // the obligation's citation_units
Status string `json:"status"` // erfuellt | offen | unklar
Controls []ObligationControlStatus `json:"controls"`
CitationSpans string `json:"citation_spans"` // "pending" until the registry fills them
}
// ObligationControlStatus is one control under an obligation with its evidence picture.
type ObligationControlStatus struct {
Framework string `json:"framework"`
Control string `json:"control"`
MappingType string `json:"mapping_type"`
RequiredEvidence []EvidenceRequirement `json:"required_evidence"`
MissingEvidence []EvidenceRequirement `json:"missing_evidence"`
}
// AssessObligationStatus traverses obligation_id -> (citation_unit) -> accepted Controls ->
// required Evidence -> Status. hasEvidence reports whether a given (framework, control,
// evidence_type) is already collected; pass nil in the MVP (no collection yet) -> everything
// required is missing and the status is "offen". Unknown or unmapped obligation -> "unklar".
func AssessObligationStatus(joins *ObligationJoinKeys, mappings *ControlMappingSet, evidence *EvidenceRequirementSet, obligationID string, hasEvidence func(framework, control, evidenceType string) bool) ObligationStatus {
ob := joins.FindObligation(obligationID)
if ob == nil {
return ObligationStatus{ObligationID: obligationID, Status: "unklar", CitationSpans: "pending"}
}
st := ObligationStatus{
ObligationID: obligationID,
LegalBasis: ob.CitationUnits,
CitationSpans: "pending",
Controls: []ObligationControlStatus{},
}
ctrls := AcceptedControlsForObligation(*ob, mappings)
if len(ctrls) == 0 {
st.Status = "unklar" // no accepted control reaches it — we cannot assess
return st
}
anyMissing := false
for _, m := range ctrls {
req := evidence.RequiredFor(m.TargetFramework, m.TargetControl)
missing := make([]EvidenceRequirement, 0, len(req))
for _, e := range req {
if hasEvidence == nil || !hasEvidence(e.Framework, e.Control, e.EvidenceType) {
missing = append(missing, e)
}
}
if len(missing) > 0 {
anyMissing = true
}
st.Controls = append(st.Controls, ObligationControlStatus{
Framework: m.TargetFramework,
Control: m.TargetControl,
MappingType: m.MappingType,
RequiredEvidence: req,
MissingEvidence: missing,
})
}
if anyMissing {
st.Status = "offen"
} else {
st.Status = "erfuellt"
}
return st
}
@@ -0,0 +1,59 @@
package ucca
import "testing"
func loadGraph(t *testing.T) (*ObligationJoinKeys, *ControlMappingSet, *EvidenceRequirementSet) {
t.Helper()
joins, err := LoadObligationJoinKeys("../../../obligations/obligation_join_keys.json")
if err != nil {
t.Fatalf("join keys: %v", err)
}
maps, err := LoadControlMappings("../../data/control_mappings")
if err != nil {
t.Fatalf("mappings: %v", err)
}
ev, err := LoadEvidenceRequirements("../../data/evidence_requirements")
if err != nil {
t.Fatalf("evidence: %v", err)
}
return joins, maps, ev
}
func TestAssessObligationStatus(t *testing.T) {
joins, maps, ev := loadGraph(t)
// covered obligation, no evidence collected yet (MVP) -> offen
st := AssessObligationStatus(joins, maps, ev, "firmware_software_authentication", nil)
if st.Status != "offen" {
t.Errorf("want offen, got %q", st.Status)
}
if len(st.Controls) == 0 {
t.Fatal("expected controls for a covered obligation")
}
for _, c := range st.Controls {
if len(c.MissingEvidence) != len(c.RequiredEvidence) {
t.Error("MVP: all required evidence should be missing")
}
}
t.Logf("DURCHSTICH firmware_software_authentication: status=%s legal_basis=%v citation_spans=%s",
st.Status, st.LegalBasis, st.CitationSpans)
for _, c := range st.Controls {
t.Logf(" %s %s (%s): %d required evidence, %d missing", c.Framework, c.Control, c.MappingType, len(c.RequiredEvidence), len(c.MissingEvidence))
}
// all evidence present -> erfuellt
st2 := AssessObligationStatus(joins, maps, ev, "firmware_software_authentication", func(f, c, et string) bool { return true })
if st2.Status != "erfuellt" {
t.Errorf("want erfuellt with all evidence present, got %q", st2.Status)
}
// uncovered obligation (no accepted control reaches it) -> unklar
if st3 := AssessObligationStatus(joins, maps, ev, "sbom_creation", nil); st3.Status != "unklar" {
t.Errorf("uncovered sbom_creation: want unklar, got %q", st3.Status)
}
// unknown obligation_id -> unklar
if st4 := AssessObligationStatus(joins, maps, ev, "does_not_exist", nil); st4.Status != "unklar" {
t.Errorf("unknown obligation: want unklar, got %q", st4.Status)
}
}
@@ -146,3 +146,36 @@ func ComputeObligationCoverage(joins *ObligationJoinKeys, mappings *ControlMappi
} }
return out return out
} }
// AcceptedControlsForObligation returns our accepted control mappings that reach an
// obligation via the interim citation_unit join (deduped by target control).
func AcceptedControlsForObligation(ob ObligationKey, mappings *ControlMappingSet) []ControlMapping {
keys := make(map[string]bool, len(ob.CitationUnits))
for _, cu := range ob.CitationUnits {
keys[citationUnitKey(cu)] = true
}
out := []ControlMapping{}
seen := map[string]bool{}
for _, m := range mappings.All {
if !m.IsAccepted() || !keys[citationUnitKey(m.SourceNorm)] {
continue
}
ck := m.TargetFramework + ":" + m.TargetControl
if seen[ck] {
continue
}
seen[ck] = true
out = append(out, m)
}
return out
}
// FindObligation returns the registry entry for an obligation_id (nil if unknown).
func (o *ObligationJoinKeys) FindObligation(obligationID string) *ObligationKey {
for i := range o.ObligationIDs {
if o.ObligationIDs[i].ObligationID == obligationID {
return &o.ObligationIDs[i]
}
}
return nil
}