feat(ucca): obligation-join loader + citation_unit bridge + coverage report

Consumes the cross-session contract obligations/obligation_join_keys.json (47
obligation_ids). Interim bridge = citation_unit (our source_norm <-> registry
citation_units), to be hardened to the stable obligation_id (field now optional
on ControlMapping).

ComputeObligationCoverage joins the 47 registry obligations to our accepted
control mappings: covered=2 (user_authentication_required, firmware_software_
authentication), mapped_rejected=3 ((2)(e) -> our OWASP mappings rejected,
route via NIST/BSI), uncovered=42. This coverage signal is the feedback to the
Obligation session for what to cut/refine next.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-25 11:10:53 +02:00
parent 9e0a9ccef4
commit 86d1473a6a
3 changed files with 217 additions and 7 deletions
@@ -0,0 +1,148 @@
package ucca
import (
"encoding/json"
"os"
"regexp"
"strings"
)
// ObligationKey is one entry of the Obligation Registry's cross-session contract
// (obligations/obligation_join_keys.json). obligation_id is the STABLE join key — assigned
// only by the Registry, never minted here. citation_units are the interim bridge until our
// ControlMapping adopts obligation_id directly.
type ObligationKey struct {
ObligationID string `json:"obligation_id"`
Regulation string `json:"regulation"`
Family string `json:"family"`
Tier string `json:"tier"`
CitationUnits []string `json:"citation_units"`
SourceRole string `json:"source_role"`
}
// ObligationJoinKeys is the loaded contract + a citation-unit index for the interim join.
type ObligationJoinKeys struct {
SchemaVersion string `json:"schema_version"`
Count int `json:"count"`
ObligationIDs []ObligationKey `json:"obligation_ids"`
byCitationKey map[string][]string
}
var citationRefRe = regexp.MustCompile(`\(([0-9a-zA-Z]+)\)`)
// citationUnitKey normalizes a CRA Annex I reference for the INTERIM citation_unit join, so
// our "CRA Annex I Part I (2)(c)" and the Registry's "Annex I (2)(c)" collapse to the same
// key ("i:2.c"). Interim only — superseded by the stable obligation_id once adopted.
func citationUnitKey(cu string) string {
low := strings.ToLower(cu)
part := ""
switch {
case strings.Contains(low, "part ii"):
part = "ii"
case strings.Contains(low, "part i"), strings.Contains(low, "(2)"):
part = "i" // CRA Annex I Part I = the (2)(x) essential requirements
}
var refs []string
for _, m := range citationRefRe.FindAllStringSubmatch(cu, -1) {
refs = append(refs, strings.ToLower(m[1]))
}
return part + ":" + strings.Join(refs, ".")
}
// LoadObligationJoinKeys reads the Registry contract and indexes it by citation-unit key.
func LoadObligationJoinKeys(path string) (*ObligationJoinKeys, error) {
raw, err := os.ReadFile(path)
if err != nil {
return nil, err
}
var o ObligationJoinKeys
if err := json.Unmarshal(raw, &o); err != nil {
return nil, err
}
o.byCitationKey = map[string][]string{}
for _, ob := range o.ObligationIDs {
for _, cu := range ob.CitationUnits {
k := citationUnitKey(cu)
o.byCitationKey[k] = append(o.byCitationKey[k], ob.ObligationID)
}
}
return &o, nil
}
// ObligationsForCitation returns the obligation_ids that join (interim) to a citation
// reference such as a control_mapping.source_norm.
func (o *ObligationJoinKeys) ObligationsForCitation(citationRef string) []string {
return o.byCitationKey[citationUnitKey(citationRef)]
}
// ObligationCoverage is one row of the cross-session coverage report: for a registry
// obligation, which of our accepted controls reach it (via the citation_unit join), how much
// evidence they require, and the resulting coverage status.
type ObligationCoverage struct {
ObligationID string `json:"obligation_id"`
Family string `json:"family"`
Status string `json:"status"` // covered | mapped_rejected | uncovered
AcceptedControls []string `json:"accepted_controls"`
EvidenceCount int `json:"evidence_count"`
}
// ComputeObligationCoverage joins the Registry obligations to our accepted control mappings
// (via citation_unit) and reports per obligation: covered (>=1 accepted control reaches it),
// mapped_rejected (only rejected mappings reach it), or uncovered (no mapping reaches it).
// This is the signal back to the Obligation session for what to cut/refine next.
func ComputeObligationCoverage(joins *ObligationJoinKeys, mappings *ControlMappingSet, evidence *EvidenceRequirementSet) []ObligationCoverage {
type bucket struct {
acceptedCtrls map[string]bool
rejected bool
}
byKey := map[string]*bucket{}
for _, m := range mappings.All {
k := citationUnitKey(m.SourceNorm)
b := byKey[k]
if b == nil {
b = &bucket{acceptedCtrls: map[string]bool{}}
byKey[k] = b
}
switch {
case m.IsAccepted():
b.acceptedCtrls[m.TargetFramework+":"+m.TargetControl] = true
case m.MappingStatus == "rejected":
b.rejected = true
}
}
out := make([]ObligationCoverage, 0, len(joins.ObligationIDs))
for _, ob := range joins.ObligationIDs {
cov := ObligationCoverage{ObligationID: ob.ObligationID, Family: ob.Family}
seen := map[string]bool{}
rejected := false
for _, cu := range ob.CitationUnits {
b := byKey[citationUnitKey(cu)]
if b == nil {
continue
}
rejected = rejected || b.rejected
for ck := range b.acceptedCtrls {
if seen[ck] {
continue
}
seen[ck] = true
cov.AcceptedControls = append(cov.AcceptedControls, ck)
fwCtrl := strings.SplitN(ck, ":", 2)
if len(fwCtrl) == 2 {
cov.EvidenceCount += len(evidence.RequiredFor(fwCtrl[0], fwCtrl[1]))
}
}
}
switch {
case len(cov.AcceptedControls) > 0:
cov.Status = "covered"
case rejected:
cov.Status = "mapped_rejected"
default:
cov.Status = "uncovered"
}
out = append(out, cov)
}
return out
}