package ucca import ( "encoding/json" "os" "regexp" "strings" ) // ObligationKey is one entry of the Obligation Registry's cross-session contract // (obligations/obligation_join_keys.json). obligation_id is the STABLE join key — assigned // only by the Registry, never minted here. citation_units are the interim bridge until our // ControlMapping adopts obligation_id directly. type ObligationKey struct { ObligationID string `json:"obligation_id"` Regulation string `json:"regulation"` Family string `json:"family"` Tier string `json:"tier"` CitationUnits []string `json:"citation_units"` SourceRole string `json:"source_role"` } // ObligationJoinKeys is the loaded contract + a citation-unit index for the interim join. type ObligationJoinKeys struct { SchemaVersion string `json:"schema_version"` Count int `json:"count"` ObligationIDs []ObligationKey `json:"obligation_ids"` byCitationKey map[string][]string } var citationRefRe = regexp.MustCompile(`\(([0-9a-zA-Z]+)\)`) // citationUnitKey normalizes a CRA Annex I reference for the INTERIM citation_unit join, so // our "CRA Annex I Part I (2)(c)" and the Registry's "Annex I (2)(c)" collapse to the same // key ("i:2.c"). Interim only — superseded by the stable obligation_id once adopted. func citationUnitKey(cu string) string { low := strings.ToLower(cu) part := "" switch { case strings.Contains(low, "part ii"): part = "ii" case strings.Contains(low, "part i"), strings.Contains(low, "(2)"): part = "i" // CRA Annex I Part I = the (2)(x) essential requirements } var refs []string for _, m := range citationRefRe.FindAllStringSubmatch(cu, -1) { refs = append(refs, strings.ToLower(m[1])) } return part + ":" + strings.Join(refs, ".") } // LoadObligationJoinKeys reads the Registry contract and indexes it by citation-unit key. func LoadObligationJoinKeys(path string) (*ObligationJoinKeys, error) { raw, err := os.ReadFile(path) if err != nil { return nil, err } var o ObligationJoinKeys if err := json.Unmarshal(raw, &o); err != nil { return nil, err } o.byCitationKey = map[string][]string{} for _, ob := range o.ObligationIDs { for _, cu := range ob.CitationUnits { k := citationUnitKey(cu) o.byCitationKey[k] = append(o.byCitationKey[k], ob.ObligationID) } } return &o, nil } // ObligationsForCitation returns the obligation_ids that join (interim) to a citation // reference such as a control_mapping.source_norm. func (o *ObligationJoinKeys) ObligationsForCitation(citationRef string) []string { return o.byCitationKey[citationUnitKey(citationRef)] } // FindObligation returns the registry entry for an obligation_id (nil if unknown). func (o *ObligationJoinKeys) FindObligation(obligationID string) *ObligationKey { for i := range o.ObligationIDs { if o.ObligationIDs[i].ObligationID == obligationID { return &o.ObligationIDs[i] } } return nil } // mappingReaches reports whether a control mapping reaches an obligation — EXACT via the // adopted obligation_id (semantic, preferred), else via the interim citation_unit join (for // not-yet-adopted rows). Once obligation_id is set, the coarse citation_unit match is ignored: // that is how the semantic join replaces the structural one (e.g. V11.2.1 crypto no longer // rides (2)(d) into user_authentication_required — it goes to credential_confidentiality_protection). func mappingReaches(m ControlMapping, ob ObligationKey, citationKeys map[string]bool) bool { if m.ObligationID != "" { return m.ObligationID == ob.ObligationID } return citationKeys[citationUnitKey(m.SourceNorm)] } // AcceptedControlsForObligation returns our accepted control mappings that reach an obligation // (deduped by target control), obligation_id-exact where adopted, citation_unit otherwise. func AcceptedControlsForObligation(ob ObligationKey, mappings *ControlMappingSet) []ControlMapping { keys := make(map[string]bool, len(ob.CitationUnits)) for _, cu := range ob.CitationUnits { keys[citationUnitKey(cu)] = true } out := []ControlMapping{} seen := map[string]bool{} for _, m := range mappings.All { if !m.IsAccepted() || !mappingReaches(m, ob, keys) { continue } ck := m.TargetFramework + ":" + m.TargetControl if seen[ck] { continue } seen[ck] = true out = append(out, m) } return out } // ObligationCoverage is one row of the cross-session coverage report. type ObligationCoverage struct { ObligationID string `json:"obligation_id"` Family string `json:"family"` Status string `json:"status"` // covered | mapped_rejected | uncovered AcceptedControls []string `json:"accepted_controls"` EvidenceCount int `json:"evidence_count"` } // ComputeObligationCoverage joins the Registry obligations to our control mappings — exact via // obligation_id where adopted, else via the interim citation_unit join — and reports per // obligation: covered (>=1 accepted control reaches it), mapped_rejected (only rejected // mappings reach it), or uncovered. The signal back to the Obligation session. func ComputeObligationCoverage(joins *ObligationJoinKeys, mappings *ControlMappingSet, evidence *EvidenceRequirementSet) []ObligationCoverage { out := make([]ObligationCoverage, 0, len(joins.ObligationIDs)) for _, ob := range joins.ObligationIDs { keys := make(map[string]bool, len(ob.CitationUnits)) for _, cu := range ob.CitationUnits { keys[citationUnitKey(cu)] = true } cov := ObligationCoverage{ObligationID: ob.ObligationID, Family: ob.Family} seen := map[string]bool{} rejected := false for _, m := range mappings.All { if !mappingReaches(m, ob, keys) { continue } if m.IsAccepted() { ck := m.TargetFramework + ":" + m.TargetControl if !seen[ck] { seen[ck] = true cov.AcceptedControls = append(cov.AcceptedControls, ck) cov.EvidenceCount += len(evidence.RequiredFor(m.TargetFramework, m.TargetControl)) } } else if m.MappingStatus == "rejected" { rejected = true } } switch { case len(cov.AcceptedControls) > 0: cov.Status = "covered" case rejected: cov.Status = "mapped_rejected" default: cov.Status = "uncovered" } out = append(out, cov) } return out }