From 417bcda68caa333330271b337ee62b085265924a Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Thu, 25 Jun 2026 11:15:57 +0200 Subject: [PATCH] feat(ucca): Advisor obligation-status Durchstich (step 3 complete) AssessObligationStatus traverses obligation_id -> (citation_unit) -> accepted controls -> required evidence -> status (erfuellt|offen|unklar). Evidence presence is a callback; MVP passes nil (nothing collected yet) -> offen. citation_spans = "pending" until the Legal-Knowledge-Graph session attaches them. This is the vertical slice that makes the graph a product feature: "CRA obligation fulfilled because evidence X/Y/Z is present", not "a doc exists". Co-Authored-By: Claude Opus 4.7 --- .../internal/ucca/compliance_status.go | 71 +++++++++++++++++++ .../internal/ucca/compliance_status_test.go | 59 +++++++++++++++ .../internal/ucca/obligation_join.go | 33 +++++++++ 3 files changed, 163 insertions(+) create mode 100644 ai-compliance-sdk/internal/ucca/compliance_status.go create mode 100644 ai-compliance-sdk/internal/ucca/compliance_status_test.go diff --git a/ai-compliance-sdk/internal/ucca/compliance_status.go b/ai-compliance-sdk/internal/ucca/compliance_status.go new file mode 100644 index 00000000..0375c4ab --- /dev/null +++ b/ai-compliance-sdk/internal/ucca/compliance_status.go @@ -0,0 +1,71 @@ +package ucca + +// ObligationStatus is the Advisor's vertical slice over the compliance graph for ONE legal +// obligation: which accepted controls satisfy it, what evidence they require, what's missing, +// and the resulting status. The point is "the required evidence is (not) present", not "a +// document exists". citation_spans is pending until the Legal-Knowledge-Graph session attaches +// them to the obligation (the upper half of the bridge). +type ObligationStatus struct { + ObligationID string `json:"obligation_id"` + LegalBasis []string `json:"legal_basis"` // the obligation's citation_units + Status string `json:"status"` // erfuellt | offen | unklar + Controls []ObligationControlStatus `json:"controls"` + CitationSpans string `json:"citation_spans"` // "pending" until the registry fills them +} + +// ObligationControlStatus is one control under an obligation with its evidence picture. +type ObligationControlStatus struct { + Framework string `json:"framework"` + Control string `json:"control"` + MappingType string `json:"mapping_type"` + RequiredEvidence []EvidenceRequirement `json:"required_evidence"` + MissingEvidence []EvidenceRequirement `json:"missing_evidence"` +} + +// AssessObligationStatus traverses obligation_id -> (citation_unit) -> accepted Controls -> +// required Evidence -> Status. hasEvidence reports whether a given (framework, control, +// evidence_type) is already collected; pass nil in the MVP (no collection yet) -> everything +// required is missing and the status is "offen". Unknown or unmapped obligation -> "unklar". +func AssessObligationStatus(joins *ObligationJoinKeys, mappings *ControlMappingSet, evidence *EvidenceRequirementSet, obligationID string, hasEvidence func(framework, control, evidenceType string) bool) ObligationStatus { + ob := joins.FindObligation(obligationID) + if ob == nil { + return ObligationStatus{ObligationID: obligationID, Status: "unklar", CitationSpans: "pending"} + } + st := ObligationStatus{ + ObligationID: obligationID, + LegalBasis: ob.CitationUnits, + CitationSpans: "pending", + Controls: []ObligationControlStatus{}, + } + ctrls := AcceptedControlsForObligation(*ob, mappings) + if len(ctrls) == 0 { + st.Status = "unklar" // no accepted control reaches it — we cannot assess + return st + } + anyMissing := false + for _, m := range ctrls { + req := evidence.RequiredFor(m.TargetFramework, m.TargetControl) + missing := make([]EvidenceRequirement, 0, len(req)) + for _, e := range req { + if hasEvidence == nil || !hasEvidence(e.Framework, e.Control, e.EvidenceType) { + missing = append(missing, e) + } + } + if len(missing) > 0 { + anyMissing = true + } + st.Controls = append(st.Controls, ObligationControlStatus{ + Framework: m.TargetFramework, + Control: m.TargetControl, + MappingType: m.MappingType, + RequiredEvidence: req, + MissingEvidence: missing, + }) + } + if anyMissing { + st.Status = "offen" + } else { + st.Status = "erfuellt" + } + return st +} diff --git a/ai-compliance-sdk/internal/ucca/compliance_status_test.go b/ai-compliance-sdk/internal/ucca/compliance_status_test.go new file mode 100644 index 00000000..5302c839 --- /dev/null +++ b/ai-compliance-sdk/internal/ucca/compliance_status_test.go @@ -0,0 +1,59 @@ +package ucca + +import "testing" + +func loadGraph(t *testing.T) (*ObligationJoinKeys, *ControlMappingSet, *EvidenceRequirementSet) { + t.Helper() + joins, err := LoadObligationJoinKeys("../../../obligations/obligation_join_keys.json") + if err != nil { + t.Fatalf("join keys: %v", err) + } + maps, err := LoadControlMappings("../../data/control_mappings") + if err != nil { + t.Fatalf("mappings: %v", err) + } + ev, err := LoadEvidenceRequirements("../../data/evidence_requirements") + if err != nil { + t.Fatalf("evidence: %v", err) + } + return joins, maps, ev +} + +func TestAssessObligationStatus(t *testing.T) { + joins, maps, ev := loadGraph(t) + + // covered obligation, no evidence collected yet (MVP) -> offen + st := AssessObligationStatus(joins, maps, ev, "firmware_software_authentication", nil) + if st.Status != "offen" { + t.Errorf("want offen, got %q", st.Status) + } + if len(st.Controls) == 0 { + t.Fatal("expected controls for a covered obligation") + } + for _, c := range st.Controls { + if len(c.MissingEvidence) != len(c.RequiredEvidence) { + t.Error("MVP: all required evidence should be missing") + } + } + t.Logf("DURCHSTICH firmware_software_authentication: status=%s legal_basis=%v citation_spans=%s", + st.Status, st.LegalBasis, st.CitationSpans) + for _, c := range st.Controls { + t.Logf(" %s %s (%s): %d required evidence, %d missing", c.Framework, c.Control, c.MappingType, len(c.RequiredEvidence), len(c.MissingEvidence)) + } + + // all evidence present -> erfuellt + st2 := AssessObligationStatus(joins, maps, ev, "firmware_software_authentication", func(f, c, et string) bool { return true }) + if st2.Status != "erfuellt" { + t.Errorf("want erfuellt with all evidence present, got %q", st2.Status) + } + + // uncovered obligation (no accepted control reaches it) -> unklar + if st3 := AssessObligationStatus(joins, maps, ev, "sbom_creation", nil); st3.Status != "unklar" { + t.Errorf("uncovered sbom_creation: want unklar, got %q", st3.Status) + } + + // unknown obligation_id -> unklar + if st4 := AssessObligationStatus(joins, maps, ev, "does_not_exist", nil); st4.Status != "unklar" { + t.Errorf("unknown obligation: want unklar, got %q", st4.Status) + } +} diff --git a/ai-compliance-sdk/internal/ucca/obligation_join.go b/ai-compliance-sdk/internal/ucca/obligation_join.go index 5e06386e..128f1005 100644 --- a/ai-compliance-sdk/internal/ucca/obligation_join.go +++ b/ai-compliance-sdk/internal/ucca/obligation_join.go @@ -146,3 +146,36 @@ func ComputeObligationCoverage(joins *ObligationJoinKeys, mappings *ControlMappi } return out } + +// AcceptedControlsForObligation returns our accepted control mappings that reach an +// obligation via the interim citation_unit join (deduped by target control). +func AcceptedControlsForObligation(ob ObligationKey, mappings *ControlMappingSet) []ControlMapping { + keys := make(map[string]bool, len(ob.CitationUnits)) + for _, cu := range ob.CitationUnits { + keys[citationUnitKey(cu)] = true + } + out := []ControlMapping{} + seen := map[string]bool{} + for _, m := range mappings.All { + if !m.IsAccepted() || !keys[citationUnitKey(m.SourceNorm)] { + continue + } + ck := m.TargetFramework + ":" + m.TargetControl + if seen[ck] { + continue + } + seen[ck] = true + out = append(out, m) + } + return out +} + +// FindObligation returns the registry entry for an obligation_id (nil if unknown). +func (o *ObligationJoinKeys) FindObligation(obligationID string) *ObligationKey { + for i := range o.ObligationIDs { + if o.ObligationIDs[i].ObligationID == obligationID { + return &o.ObligationIDs[i] + } + } + return nil +}