Files
breakpilot-compliance/backend-compliance/compliance/services/cra_use_case_controls.py
T
Benjamin Admin 398eaf3c36 feat(cra): two-lane breadth — CRA-specific corpus + technical depth
All 6 security use_cases are atom-grain now. Per finding we draw two lanes: the
CRA corpus (use_case=cra, the most on-point CRA obligations) + the technical
depth (code_security for secure-dev, else network_security). Controls merged,
deduped, each tagged with its use_case (shown in the best-practice depth).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-14 12:23:37 +02:00

89 lines
4.5 KiB
Python

"""Attach the atom-grain network_security regulatory breadth to CRA findings.
This is the "semantic breadth (2)" from the handoff: the shared Controls-API
(compliance.atom_classification, use_case=network_security, ~11k precise,
framework-traceable obligations). It runs at the ENDPOINT/VIEW layer — NOT in
the pure cra_finding_mapper, which stays deterministic. The CRA Annex I anchor +
the curated measure + the NIST/OWASP golden-set crosswalk remain the lead; this
is breadth + source evidence, not a replacement.
Only network_security is atom-grain — we query only that, always scoped by
sub_topic + limit (per the caveats).
"""
from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS
from compliance.services.use_case_controls import UseCaseControlsService
# CRA-AI requirement -> network_security sub_topic (via the NIST families per
# CRA-AI). Exact sub_topic keys verified against the live atom_classification.
_REQ_TO_SUBTOPIC = {
"CRA-AI-1": "secure_development", "CRA-AI-2": "network_segmentation",
"CRA-AI-3": "network_segmentation", "CRA-AI-4": "access_control",
"CRA-AI-5": "secure_development", "CRA-AI-6": "secure_development",
"CRA-AI-7": "authentication", "CRA-AI-8": "authentication", "CRA-AI-9": "authentication",
"CRA-AI-10": "access_control", "CRA-AI-11": "authentication", "CRA-AI-12": "access_control",
"CRA-AI-13": "cryptography", "CRA-AI-14": "cryptography", "CRA-AI-15": "cryptography",
"CRA-AI-16": "cryptography", "CRA-AI-17": "data_protection",
"CRA-AI-18": "secure_development", "CRA-AI-19": "secure_development", "CRA-AI-20": "secure_development",
"CRA-AI-21": "supply_chain_security", "CRA-AI-22": "vulnerability_management",
"CRA-AI-23": "supply_chain_security",
"CRA-AI-24": "logging_monitoring", "CRA-AI-25": "logging_monitoring",
"CRA-AI-26": "logging_monitoring", "CRA-AI-27": "logging_monitoring",
"CRA-AI-28": "vulnerability_management", "CRA-AI-29": "vulnerability_management",
"CRA-AI-30": "vulnerability_management", "CRA-AI-31": "vulnerability_management",
"CRA-AI-32": "vulnerability_management", "CRA-AI-33": "vulnerability_management",
"CRA-AI-34": "vulnerability_management",
"CRA-AI-35": "incident_response", "CRA-AI-36": "incident_response",
"CRA-AI-37": "incident_response", "CRA-AI-38": "incident_response",
"CRA-AI-39": "vulnerability_management", "CRA-AI-40": "incident_response",
}
# Two breadth lanes per finding: the CRA-specific corpus (always — most on-point
# for a CRA assessment) + the technical-depth corpus for the sub_topic
# (code_security for secure-dev, else network_security). All atom-grain.
# isms/dora/kritis are also atom-grain — available for future per-regime routing.
_TECHNICAL_USECASE = {"secure_development": "code_security"} # default: network_security
def subtopic_for(req_id: str):
return _REQ_TO_SUBTOPIC.get(req_id)
def usecases_for(sub_topic: str) -> list:
return ["cra", _TECHNICAL_USECASE.get(sub_topic, "network_security")]
def enrich_findings_with_breadth(mapped: list, db, per_use_case: int = 3) -> None:
"""Attach `sub_topic` + `regulatory_breadth` (atom controls from the CRA corpus
+ the technical-depth corpus) to each finding, each control tagged with its
use_case. Queries are cached per (use_case, sub_topic). Best-effort: on any
error a finding just gets fewer/empty breadth — never breaks the assessment.
"""
svc = UseCaseControlsService(db)
cache: dict = {}
for m in mapped:
st = _REQ_TO_SUBTOPIC.get(m.get("primary_requirement"))
m["sub_topic"] = st
if not st:
m["regulatory_breadth"] = []
continue
merged, seen = [], set()
for uc in usecases_for(st):
key = (uc, st)
if key not in cache:
try:
res = svc.controls_for_use_case(uc, sub_topic=st, limit=per_use_case)
cache[key] = [
{"control_id": c.get("control_id"), "title": c.get("title"),
"source_regulation": c.get("source_regulation"),
"severity": c.get("severity"), "use_case": uc}
for c in res.get("controls", [])
]
except Exception:
cache[key] = []
for c in cache[key]:
if c["control_id"] and c["control_id"] not in seen:
seen.add(c["control_id"])
merged.append(c)
m["regulatory_breadth"] = merged