feat(cra): coarse priority engine — P0 floor + customer weights + quick wins
Deterministic prioritisation on top of the mapper (cra_prioritizer.py): a non-negotiable P0 floor (safety-function compromise / actively exploited / CRITICAL — customer weights cannot demote) plus a discretionary tier ranked by severity x the customer's weight (high/medium/low) for the 5 business objectives (access/data/network_api/supply_updates/monitoring). Quick-win flag (high impact, low effort) for a second view; each finding carries a short priority reason. Endpoint accepts weights + per-finding safety_impact/exploited. Rough pre-sort only (devs re-sort in Jira). No DB. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -15,6 +15,7 @@ from typing import Optional
|
||||
|
||||
from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS, MEASURES, DEADLINES
|
||||
from compliance.services.cra_security_crosswalk import security_refs_for
|
||||
from compliance.services.cra_prioritizer import prioritize, OBJECTIVES
|
||||
|
||||
_REQ_INDEX = {r["req_id"]: r for r in ANNEX_I_REQUIREMENTS}
|
||||
_SEV_ORDER = {"LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4}
|
||||
@@ -66,6 +67,8 @@ class ScannerFinding:
|
||||
severity: str = "" # critical | high | medium | low (scanner's rating)
|
||||
cvss: Optional[float] = None
|
||||
location: str = ""
|
||||
safety_impact: bool = False # compromise can defeat a CE safety function (personal harm)
|
||||
exploited: bool = False # actively / publicly exploited
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, d: dict) -> "ScannerFinding":
|
||||
@@ -78,6 +81,8 @@ class ScannerFinding:
|
||||
severity=d.get("severity", "") or "",
|
||||
cvss=d.get("cvss"),
|
||||
location=d.get("location", "") or d.get("path", ""),
|
||||
safety_impact=bool(d.get("safety_impact", False)),
|
||||
exploited=bool(d.get("exploited", False)),
|
||||
)
|
||||
|
||||
|
||||
@@ -94,6 +99,14 @@ class MappedFinding:
|
||||
owasp_refs: list = field(default_factory=list) # [{code, label}] OWASP Top 10:2021
|
||||
rationale: str = ""
|
||||
unmapped: bool = False
|
||||
# carried from the finding + set by the prioritizer (cra_prioritizer.prioritize)
|
||||
safety_impact: bool = False
|
||||
exploited: bool = False
|
||||
objective: str = ""
|
||||
priority_tier: str = "" # P0 (non-negotiable floor) | P1 | P2 | P3
|
||||
priority_score: int = 0
|
||||
quick_win: bool = False
|
||||
priority_reason: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -105,6 +118,8 @@ class CRAAssessment:
|
||||
open_measures: list = field(default_factory=list) # [{id, description}]
|
||||
unmapped_findings: list = field(default_factory=list)
|
||||
coverage_pct: float = 0.0
|
||||
quick_wins: list = field(default_factory=list) # finding_ids: high impact, low effort
|
||||
objectives: list = field(default_factory=lambda: list(OBJECTIVES))
|
||||
deadlines: list = field(default_factory=lambda: list(DEADLINES))
|
||||
|
||||
|
||||
@@ -156,7 +171,7 @@ def map_finding(f: ScannerFinding) -> MappedFinding:
|
||||
return MappedFinding(
|
||||
finding_id=f.id, risk_level=_SEV_BY_RANK.get(_rank(finding_sev), "LOW"),
|
||||
rationale="Kein eindeutiger CRA-Anforderungsbezug erkannt — manuelle Pruefung.",
|
||||
unmapped=True,
|
||||
unmapped=True, safety_impact=f.safety_impact, exploited=f.exploited,
|
||||
)
|
||||
primary = _REQ_INDEX[reqs[0]]
|
||||
risk_rank = max(_rank(finding_sev), _rank(primary["severity"]))
|
||||
@@ -177,12 +192,18 @@ def map_finding(f: ScannerFinding) -> MappedFinding:
|
||||
nist_refs=refs["nist"],
|
||||
owasp_refs=refs["owasp"],
|
||||
rationale="{}: {}".format(primary["req_id"], primary.get("title", "")),
|
||||
safety_impact=f.safety_impact,
|
||||
exploited=f.exploited,
|
||||
)
|
||||
|
||||
|
||||
def assess_findings(findings: list) -> CRAAssessment:
|
||||
"""Map a list of ScannerFinding into a deterministic CRA assessment."""
|
||||
mapped = [map_finding(f) for f in findings]
|
||||
def assess_findings(findings: list, weights=None) -> CRAAssessment:
|
||||
"""Map findings to a deterministic CRA assessment, then prioritise them.
|
||||
|
||||
weights: {objective: 'high'|'medium'|'low'} — the customer's priorities for
|
||||
the discretionary tier (the P0 floor ignores them).
|
||||
"""
|
||||
mapped = prioritize([map_finding(f) for f in findings], weights)
|
||||
by_risk = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0}
|
||||
reqs_touched, measure_ids, unmapped = set(), [], []
|
||||
for m in mapped:
|
||||
@@ -204,6 +225,7 @@ def assess_findings(findings: list) -> CRAAssessment:
|
||||
open_measures=[{"id": mid, "description": MEASURES.get(mid, "")} for mid in measure_ids],
|
||||
unmapped_findings=unmapped,
|
||||
coverage_pct=round(100.0 * covered / total, 1) if total else 0.0,
|
||||
quick_wins=[m.finding_id for m in mapped if m.quick_win],
|
||||
)
|
||||
|
||||
|
||||
@@ -213,6 +235,7 @@ def assess_findings_payload(payload: dict) -> dict:
|
||||
This is the testable tool body the MCP server wraps (kept transport-free).
|
||||
"""
|
||||
raw = payload.get("findings", []) if isinstance(payload, dict) else []
|
||||
weights = payload.get("weights") if isinstance(payload, dict) else None
|
||||
findings = [ScannerFinding.from_dict(d) for d in raw]
|
||||
assessment = assess_findings(findings)
|
||||
assessment = assess_findings(findings, weights)
|
||||
return asdict(assessment) # recurses into nested MappedFinding dataclasses
|
||||
|
||||
Reference in New Issue
Block a user