a06c64af48
Both network_security and code_security are now atom-grain. Per-sub_topic use_case routing: secure_development -> code_security (best for secure-dev findings), everything else -> network_security. Findings carry breadth_use_case so the source context (which atom corpus) is visible under the best-practice depth. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
86 lines
4.0 KiB
Python
86 lines
4.0 KiB
Python
"""Attach the atom-grain network_security regulatory breadth to CRA findings.
|
|
|
|
This is the "semantic breadth (2)" from the handoff: the shared Controls-API
|
|
(compliance.atom_classification, use_case=network_security, ~11k precise,
|
|
framework-traceable obligations). It runs at the ENDPOINT/VIEW layer — NOT in
|
|
the pure cra_finding_mapper, which stays deterministic. The CRA Annex I anchor +
|
|
the curated measure + the NIST/OWASP golden-set crosswalk remain the lead; this
|
|
is breadth + source evidence, not a replacement.
|
|
|
|
Only network_security is atom-grain — we query only that, always scoped by
|
|
sub_topic + limit (per the caveats).
|
|
"""
|
|
from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS
|
|
from compliance.services.use_case_controls import UseCaseControlsService
|
|
|
|
# CRA-AI requirement -> network_security sub_topic (via the NIST families per
|
|
# CRA-AI). Exact sub_topic keys verified against the live atom_classification.
|
|
_REQ_TO_SUBTOPIC = {
|
|
"CRA-AI-1": "secure_development", "CRA-AI-2": "network_segmentation",
|
|
"CRA-AI-3": "network_segmentation", "CRA-AI-4": "access_control",
|
|
"CRA-AI-5": "secure_development", "CRA-AI-6": "secure_development",
|
|
"CRA-AI-7": "authentication", "CRA-AI-8": "authentication", "CRA-AI-9": "authentication",
|
|
"CRA-AI-10": "access_control", "CRA-AI-11": "authentication", "CRA-AI-12": "access_control",
|
|
"CRA-AI-13": "cryptography", "CRA-AI-14": "cryptography", "CRA-AI-15": "cryptography",
|
|
"CRA-AI-16": "cryptography", "CRA-AI-17": "data_protection",
|
|
"CRA-AI-18": "secure_development", "CRA-AI-19": "secure_development", "CRA-AI-20": "secure_development",
|
|
"CRA-AI-21": "supply_chain_security", "CRA-AI-22": "vulnerability_management",
|
|
"CRA-AI-23": "supply_chain_security",
|
|
"CRA-AI-24": "logging_monitoring", "CRA-AI-25": "logging_monitoring",
|
|
"CRA-AI-26": "logging_monitoring", "CRA-AI-27": "logging_monitoring",
|
|
"CRA-AI-28": "vulnerability_management", "CRA-AI-29": "vulnerability_management",
|
|
"CRA-AI-30": "vulnerability_management", "CRA-AI-31": "vulnerability_management",
|
|
"CRA-AI-32": "vulnerability_management", "CRA-AI-33": "vulnerability_management",
|
|
"CRA-AI-34": "vulnerability_management",
|
|
"CRA-AI-35": "incident_response", "CRA-AI-36": "incident_response",
|
|
"CRA-AI-37": "incident_response", "CRA-AI-38": "incident_response",
|
|
"CRA-AI-39": "vulnerability_management", "CRA-AI-40": "incident_response",
|
|
}
|
|
|
|
|
|
# Which atom-grain use_case is most relevant per sub_topic. Both network_security
|
|
# and code_security are atom-grain; secure-dev is best served by code_security.
|
|
_SUBTOPIC_TO_USECASE = {
|
|
"secure_development": "code_security",
|
|
}
|
|
_DEFAULT_USECASE = "network_security"
|
|
|
|
|
|
def subtopic_for(req_id: str):
|
|
return _REQ_TO_SUBTOPIC.get(req_id)
|
|
|
|
|
|
def usecase_for(sub_topic: str) -> str:
|
|
return _SUBTOPIC_TO_USECASE.get(sub_topic, _DEFAULT_USECASE)
|
|
|
|
|
|
def enrich_findings_with_breadth(mapped: list, db, limit: int = 5) -> None:
|
|
"""Attach `sub_topic` + `regulatory_breadth` (atom controls) to each finding.
|
|
|
|
Queries network_security once per distinct sub_topic (cached). Best-effort:
|
|
on any error a finding just gets an empty breadth — never breaks the assessment.
|
|
"""
|
|
svc = UseCaseControlsService(db)
|
|
cache: dict = {}
|
|
for m in mapped:
|
|
st = _REQ_TO_SUBTOPIC.get(m.get("primary_requirement"))
|
|
m["sub_topic"] = st
|
|
if not st:
|
|
m["regulatory_breadth"] = []
|
|
m["breadth_use_case"] = None
|
|
continue
|
|
uc = usecase_for(st)
|
|
m["breadth_use_case"] = uc
|
|
key = (uc, st)
|
|
if key not in cache:
|
|
try:
|
|
res = svc.controls_for_use_case(uc, sub_topic=st, limit=limit)
|
|
cache[key] = [
|
|
{"control_id": c.get("control_id"), "title": c.get("title"),
|
|
"source_regulation": c.get("source_regulation"), "severity": c.get("severity")}
|
|
for c in res.get("controls", [])
|
|
]
|
|
except Exception:
|
|
cache[key] = []
|
|
m["regulatory_breadth"] = cache[key]
|