63d65af41b
CRA Annex I Part I (2)(e)/(2)(l)/(2)(i) had no clean OWASP target (rejected: "Mapping ueber NIST/BSI erforderlich"). Their NIST home, curated + accepted: (2)(e) Integritaet -> SI-7 (Software/Firmware/Information Integrity) (2)(l) Sichere Updates -> SI-2 (Flaw Remediation) (2)(i) Angriffsflaeche -> CM-7 (Least Functionality) New mapping_type=primary_implementation = the single canonical control per obligation (stronger than implements/supports); related controls (SC-3(3), RA-5, AC-6, SI-16, ...) follow later as supports. Evidence is framework-AGNOSTIC: SI-7/SI-2/CM-7 reuse the shared evidence_type catalog (config_export/test_report/repo_scan) - same types carry CRA, NIST, ISO 27001, IEC 62443, BSI. (framework,control) is only the link, not the type. obligation_id left empty: the Obligation Registry assigns it (exported via controls_for_obligation_mapping.json), then we adopt. go test ./internal/ucca green. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
153 lines
6.3 KiB
Go
153 lines
6.3 KiB
Go
package ucca
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
)
|
|
|
|
// ControlMapping is one persisted, versioned, REVIEWABLE link from a legal
|
|
// obligation/requirement to a concrete framework control — a node in the curated
|
|
// compliance graph (Regulation -> Obligation -> Control -> Evidence). The retriever only
|
|
// PROPOSES candidates (mapping_status=candidate); a human/rule decision turns the good ones
|
|
// into mapping_status=accepted, which is the audited truth the Advisor uses at runtime.
|
|
//
|
|
// There is intentionally NO probabilistic "confidence" field: once curated, a mapping is a
|
|
// professional statement, not an AI guess. The retriever's score lives only in the rationale
|
|
// of a candidate, never as structured truth.
|
|
type ControlMapping struct {
|
|
SourceNorm string `json:"source_norm"` // e.g. "CRA Annex I Part I (2)(c)"
|
|
SourceRole string `json:"source_role"` // source_role of the norm (operational_requirement, ...)
|
|
TargetFramework string `json:"target_framework"` // e.g. "OWASP ASVS"
|
|
TargetControl string `json:"target_control"` // e.g. "V6.3.1"
|
|
MappingType string `json:"mapping_type"` // primary_implementation | implements | supports | partially_supports | related | contradicts
|
|
MappingStatus string `json:"mapping_status"` // candidate | accepted | rejected | superseded
|
|
Provenance string `json:"provenance"` // retriever_candidate | human_curated | rule_based
|
|
ObligationID string `json:"obligation_id,omitempty"` // stable cross-session join key (Obligation Registry); empty until adopted, citation_unit is the interim bridge
|
|
Rationale string `json:"rationale"`
|
|
ReviewedBy string `json:"reviewed_by,omitempty"` // who decided (human or rule id)
|
|
ReviewDate string `json:"review_date,omitempty"` // YYYY-MM-DD
|
|
ReviewReason string `json:"review_reason,omitempty"`
|
|
Version string `json:"version"`
|
|
}
|
|
|
|
// Allowed enum values — the deterministic "rule" layer that keeps the curated store clean.
|
|
var (
|
|
mappingTypeValues = map[string]bool{"primary_implementation": true, "implements": true, "supports": true, "partially_supports": true, "related": true, "contradicts": true}
|
|
mappingStatusValues = map[string]bool{"candidate": true, "accepted": true, "rejected": true, "superseded": true}
|
|
provenanceValues = map[string]bool{"retriever_candidate": true, "human_curated": true, "rule_based": true}
|
|
)
|
|
|
|
// Validate checks required fields + enum membership, and enforces the audit trail: any
|
|
// human/rule DECISION (accepted/rejected) must carry who/when/why. Fail-closed at load.
|
|
func (m ControlMapping) Validate() error {
|
|
switch {
|
|
case m.SourceNorm == "":
|
|
return fmt.Errorf("control mapping: source_norm required")
|
|
case m.TargetFramework == "":
|
|
return fmt.Errorf("control mapping: target_framework required")
|
|
case m.TargetControl == "":
|
|
return fmt.Errorf("control mapping: target_control required")
|
|
case !mappingTypeValues[m.MappingType]:
|
|
return fmt.Errorf("control mapping: invalid mapping_type %q", m.MappingType)
|
|
case !mappingStatusValues[m.MappingStatus]:
|
|
return fmt.Errorf("control mapping: invalid mapping_status %q", m.MappingStatus)
|
|
case !provenanceValues[m.Provenance]:
|
|
return fmt.Errorf("control mapping: invalid provenance %q", m.Provenance)
|
|
}
|
|
if m.MappingStatus == "accepted" || m.MappingStatus == "rejected" {
|
|
if m.ReviewedBy == "" || m.ReviewDate == "" || m.ReviewReason == "" {
|
|
return fmt.Errorf("control mapping %s->%s: status %q requires reviewed_by + review_date + review_reason (audit trail)",
|
|
m.SourceNorm, m.TargetControl, m.MappingStatus)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// IsAccepted reports whether this mapping is the active audited truth.
|
|
func (m ControlMapping) IsAccepted() bool { return m.MappingStatus == "accepted" }
|
|
|
|
// ControlMappingSet is the loaded, indexed mapping store (forward + reverse lookup).
|
|
type ControlMappingSet struct {
|
|
All []ControlMapping
|
|
bySourceNorm map[string][]ControlMapping
|
|
byControl map[string][]ControlMapping
|
|
}
|
|
|
|
func controlKey(framework, control string) string { return framework + ":" + control }
|
|
|
|
// ControlsFor returns the controls mapped to a source norm. acceptedOnly restricts to the
|
|
// audited truth (what the Advisor may treat as fact).
|
|
func (s *ControlMappingSet) ControlsFor(sourceNorm string, acceptedOnly bool) []ControlMapping {
|
|
return filterAccepted(s.bySourceNorm[sourceNorm], acceptedOnly)
|
|
}
|
|
|
|
// ObligationsFor returns the norms mapped to a framework control (reverse lookup).
|
|
func (s *ControlMappingSet) ObligationsFor(framework, control string, acceptedOnly bool) []ControlMapping {
|
|
return filterAccepted(s.byControl[controlKey(framework, control)], acceptedOnly)
|
|
}
|
|
|
|
func filterAccepted(in []ControlMapping, acceptedOnly bool) []ControlMapping {
|
|
if !acceptedOnly {
|
|
return in
|
|
}
|
|
out := make([]ControlMapping, 0, len(in))
|
|
for _, m := range in {
|
|
if m.IsAccepted() {
|
|
out = append(out, m)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
// LoadControlMappings reads every *.jsonl file under dir (one mapping per line; blank and
|
|
// //-prefixed lines ignored), validates each row, and builds the index. An invalid row
|
|
// aborts the whole load — fail-closed, because this is the audit truth, not best-effort.
|
|
func LoadControlMappings(dir string) (*ControlMappingSet, error) {
|
|
files, err := filepath.Glob(filepath.Join(dir, "*.jsonl"))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
set := &ControlMappingSet{
|
|
bySourceNorm: map[string][]ControlMapping{},
|
|
byControl: map[string][]ControlMapping{},
|
|
}
|
|
for _, f := range files {
|
|
fh, err := os.Open(f)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sc := bufio.NewScanner(fh)
|
|
sc.Buffer(make([]byte, 0, 64*1024), 1024*1024)
|
|
line := 0
|
|
for sc.Scan() {
|
|
line++
|
|
raw := strings.TrimSpace(sc.Text())
|
|
if raw == "" || strings.HasPrefix(raw, "//") {
|
|
continue
|
|
}
|
|
var m ControlMapping
|
|
if err := json.Unmarshal([]byte(raw), &m); err != nil {
|
|
fh.Close()
|
|
return nil, fmt.Errorf("%s:%d: %w", f, line, err)
|
|
}
|
|
if err := m.Validate(); err != nil {
|
|
fh.Close()
|
|
return nil, fmt.Errorf("%s:%d: %w", f, line, err)
|
|
}
|
|
set.All = append(set.All, m)
|
|
set.bySourceNorm[m.SourceNorm] = append(set.bySourceNorm[m.SourceNorm], m)
|
|
k := controlKey(m.TargetFramework, m.TargetControl)
|
|
set.byControl[k] = append(set.byControl[k], m)
|
|
}
|
|
fh.Close()
|
|
if err := sc.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return set, nil
|
|
}
|