12fa179bfd
Deterministic prioritisation on top of the mapper (cra_prioritizer.py): a non-negotiable P0 floor (safety-function compromise / actively exploited / CRITICAL — customer weights cannot demote) plus a discretionary tier ranked by severity x the customer's weight (high/medium/low) for the 5 business objectives (access/data/network_api/supply_updates/monitoring). Quick-win flag (high impact, low effort) for a second view; each finding carries a short priority reason. Endpoint accepts weights + per-finding safety_impact/exploited. Rough pre-sort only (devs re-sort in Jira). No DB. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
100 lines
3.9 KiB
Python
100 lines
3.9 KiB
Python
"""Coarse, deterministic CRA finding prioritisation.
|
|
|
|
Two tiers (the customer's developers re-sort in Jira anyway — we only pre-sort):
|
|
- P0 floor (NON-negotiable, customer weights cannot demote): a finding that can
|
|
defeat a CE safety function (personal harm), is actively exploited, or is
|
|
CRITICAL severity.
|
|
- Discretionary: ranked by severity x the customer's weight for the business
|
|
objective the finding belongs to.
|
|
Plus a Quick-Win flag (high impact, low effort) for a second view.
|
|
|
|
No DB/LLM. Operates on mapped-finding objects by attribute (duck-typed) to avoid
|
|
a circular import with the mapper. Every priority carries a short reason.
|
|
"""
|
|
from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS, SEVERITY_WEIGHT
|
|
|
|
_REQ = {r["req_id"]: r for r in ANNEX_I_REQUIREMENTS}
|
|
|
|
# 8 CRA-AI categories -> 5 business objectives the customer can weight.
|
|
_CATEGORY_TO_OBJECTIVE = {
|
|
"Secure-by-Design": "network_api",
|
|
"Authentifizierung": "access",
|
|
"Kryptografie": "data",
|
|
"SSDLC": "supply_updates",
|
|
"Supply Chain": "supply_updates",
|
|
"Updates": "supply_updates",
|
|
"Logging": "monitoring",
|
|
"Vulnerability Handling": "monitoring",
|
|
}
|
|
OBJECTIVES = ["access", "data", "network_api", "supply_updates", "monitoring"]
|
|
OBJECTIVE_LABELS = {
|
|
"access": "Zugang/Authentifizierung",
|
|
"data": "Datenvertraulichkeit",
|
|
"network_api": "Netzwerk/API",
|
|
"supply_updates": "Updates/Supply-Chain",
|
|
"monitoring": "Monitoring/Incident",
|
|
}
|
|
_WEIGHT_NUM = {"high": 3, "medium": 2, "low": 1}
|
|
_WEIGHT_LABEL = {"high": "hoch", "medium": "mittel", "low": "niedrig"}
|
|
_QUICK_WIN_MAX_EFFORT = 3
|
|
_TIER_RANK = {"P0": 0, "P1": 1, "P2": 2, "P3": 3}
|
|
|
|
|
|
def objective_for(req_id: str) -> str:
|
|
req = _REQ.get(req_id)
|
|
if not req:
|
|
return "network_api"
|
|
return _CATEGORY_TO_OBJECTIVE.get(req.get("category", ""), "network_api")
|
|
|
|
|
|
def _reason(p0, safety, exploited, sev, objective, weight_key, quick_win, tier):
|
|
if p0:
|
|
if safety:
|
|
return "P0 — kann eine Sicherheitsfunktion der CE-Risikobeurteilung aushebeln (Personenschaden)"
|
|
if exploited:
|
|
return "P0 — aktiv ausnutzbar / oeffentlich bekannt"
|
|
return "P0 — kritische Schwere"
|
|
obj = OBJECTIVE_LABELS.get(objective, objective)
|
|
base = "{} — Schwere {} + Prioritaet '{}' ({})".format(
|
|
tier, sev, obj, _WEIGHT_LABEL.get(weight_key, "mittel"))
|
|
if quick_win:
|
|
return base + " · Quick Win (geringer Aufwand)"
|
|
return base
|
|
|
|
|
|
def prioritize(mapped: list, weights=None) -> list:
|
|
"""Set priority fields on each mapped finding and return them sorted.
|
|
|
|
mapped: objects with attrs primary_requirement, risk_level, safety_impact,
|
|
exploited. weights: {objective: 'high'|'medium'|'low'}.
|
|
"""
|
|
weights = weights or {}
|
|
for m in mapped:
|
|
rid = getattr(m, "primary_requirement", "")
|
|
req = _REQ.get(rid, {})
|
|
objective = objective_for(rid)
|
|
sev = (getattr(m, "risk_level", "LOW") or "LOW").upper()
|
|
sev_w = SEVERITY_WEIGHT.get(sev, 10)
|
|
weight_key = str(weights.get(objective, "medium")).lower()
|
|
w = _WEIGHT_NUM.get(weight_key, 2)
|
|
safety = bool(getattr(m, "safety_impact", False))
|
|
exploited = bool(getattr(m, "exploited", False))
|
|
effort = req.get("effort_days", 99)
|
|
p0 = safety or exploited or sev == "CRITICAL"
|
|
score = sev_w * w
|
|
quick_win = (not p0) and effort <= _QUICK_WIN_MAX_EFFORT and sev in ("HIGH", "CRITICAL")
|
|
if p0:
|
|
tier = "P0"
|
|
elif score >= 150:
|
|
tier = "P1"
|
|
elif score >= 60:
|
|
tier = "P2"
|
|
else:
|
|
tier = "P3"
|
|
m.objective = objective
|
|
m.priority_tier = tier
|
|
m.priority_score = score
|
|
m.quick_win = quick_win
|
|
m.priority_reason = _reason(p0, safety, exploited, sev, objective, weight_key, quick_win, tier)
|
|
return sorted(mapped, key=lambda x: (_TIER_RANK.get(x.priority_tier, 9), -x.priority_score))
|