diff --git a/backend-compliance/compliance/api/cra_assess_routes.py b/backend-compliance/compliance/api/cra_assess_routes.py index 69cd7760..9b5faf2f 100644 --- a/backend-compliance/compliance/api/cra_assess_routes.py +++ b/backend-compliance/compliance/api/cra_assess_routes.py @@ -9,7 +9,7 @@ Project-less by design: works standalone for ANY customer — including those wi no CE risk assessment and no FMEA yet (the mandatory baseline). Reuses the fully tested mapper; no DB, no LLM, no RAG. Same logic the MCP server exposes. """ -from typing import List, Optional +from typing import Dict, List, Optional from fastapi import APIRouter from pydantic import BaseModel @@ -28,13 +28,18 @@ class FindingIn(BaseModel): severity: Optional[str] = "" cvss: Optional[float] = None location: Optional[str] = "" + safety_impact: Optional[bool] = False + exploited: Optional[bool] = False class AssessRequest(BaseModel): findings: List[FindingIn] + # customer priorities for the discretionary tier: {objective: high|medium|low}. + # objectives: access | data | network_api | supply_updates | monitoring. + weights: Optional[Dict[str, str]] = None @router.post("/assess") async def assess(body: AssessRequest): - payload = {"findings": [f.model_dump() for f in body.findings]} + payload = {"findings": [f.model_dump() for f in body.findings], "weights": body.weights} return assess_findings_payload(payload) diff --git a/backend-compliance/compliance/services/cra_finding_mapper.py b/backend-compliance/compliance/services/cra_finding_mapper.py index 28f2b647..18f3dd33 100644 --- a/backend-compliance/compliance/services/cra_finding_mapper.py +++ b/backend-compliance/compliance/services/cra_finding_mapper.py @@ -15,6 +15,7 @@ from typing import Optional from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS, MEASURES, DEADLINES from compliance.services.cra_security_crosswalk import security_refs_for +from compliance.services.cra_prioritizer import prioritize, OBJECTIVES _REQ_INDEX = {r["req_id"]: r for r in ANNEX_I_REQUIREMENTS} _SEV_ORDER = {"LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4} @@ -66,6 +67,8 @@ class ScannerFinding: severity: str = "" # critical | high | medium | low (scanner's rating) cvss: Optional[float] = None location: str = "" + safety_impact: bool = False # compromise can defeat a CE safety function (personal harm) + exploited: bool = False # actively / publicly exploited @classmethod def from_dict(cls, d: dict) -> "ScannerFinding": @@ -78,6 +81,8 @@ class ScannerFinding: severity=d.get("severity", "") or "", cvss=d.get("cvss"), location=d.get("location", "") or d.get("path", ""), + safety_impact=bool(d.get("safety_impact", False)), + exploited=bool(d.get("exploited", False)), ) @@ -94,6 +99,14 @@ class MappedFinding: owasp_refs: list = field(default_factory=list) # [{code, label}] OWASP Top 10:2021 rationale: str = "" unmapped: bool = False + # carried from the finding + set by the prioritizer (cra_prioritizer.prioritize) + safety_impact: bool = False + exploited: bool = False + objective: str = "" + priority_tier: str = "" # P0 (non-negotiable floor) | P1 | P2 | P3 + priority_score: int = 0 + quick_win: bool = False + priority_reason: str = "" @dataclass @@ -105,6 +118,8 @@ class CRAAssessment: open_measures: list = field(default_factory=list) # [{id, description}] unmapped_findings: list = field(default_factory=list) coverage_pct: float = 0.0 + quick_wins: list = field(default_factory=list) # finding_ids: high impact, low effort + objectives: list = field(default_factory=lambda: list(OBJECTIVES)) deadlines: list = field(default_factory=lambda: list(DEADLINES)) @@ -156,7 +171,7 @@ def map_finding(f: ScannerFinding) -> MappedFinding: return MappedFinding( finding_id=f.id, risk_level=_SEV_BY_RANK.get(_rank(finding_sev), "LOW"), rationale="Kein eindeutiger CRA-Anforderungsbezug erkannt — manuelle Pruefung.", - unmapped=True, + unmapped=True, safety_impact=f.safety_impact, exploited=f.exploited, ) primary = _REQ_INDEX[reqs[0]] risk_rank = max(_rank(finding_sev), _rank(primary["severity"])) @@ -177,12 +192,18 @@ def map_finding(f: ScannerFinding) -> MappedFinding: nist_refs=refs["nist"], owasp_refs=refs["owasp"], rationale="{}: {}".format(primary["req_id"], primary.get("title", "")), + safety_impact=f.safety_impact, + exploited=f.exploited, ) -def assess_findings(findings: list) -> CRAAssessment: - """Map a list of ScannerFinding into a deterministic CRA assessment.""" - mapped = [map_finding(f) for f in findings] +def assess_findings(findings: list, weights=None) -> CRAAssessment: + """Map findings to a deterministic CRA assessment, then prioritise them. + + weights: {objective: 'high'|'medium'|'low'} — the customer's priorities for + the discretionary tier (the P0 floor ignores them). + """ + mapped = prioritize([map_finding(f) for f in findings], weights) by_risk = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0} reqs_touched, measure_ids, unmapped = set(), [], [] for m in mapped: @@ -204,6 +225,7 @@ def assess_findings(findings: list) -> CRAAssessment: open_measures=[{"id": mid, "description": MEASURES.get(mid, "")} for mid in measure_ids], unmapped_findings=unmapped, coverage_pct=round(100.0 * covered / total, 1) if total else 0.0, + quick_wins=[m.finding_id for m in mapped if m.quick_win], ) @@ -213,6 +235,7 @@ def assess_findings_payload(payload: dict) -> dict: This is the testable tool body the MCP server wraps (kept transport-free). """ raw = payload.get("findings", []) if isinstance(payload, dict) else [] + weights = payload.get("weights") if isinstance(payload, dict) else None findings = [ScannerFinding.from_dict(d) for d in raw] - assessment = assess_findings(findings) + assessment = assess_findings(findings, weights) return asdict(assessment) # recurses into nested MappedFinding dataclasses diff --git a/backend-compliance/compliance/services/cra_prioritizer.py b/backend-compliance/compliance/services/cra_prioritizer.py new file mode 100644 index 00000000..d001be04 --- /dev/null +++ b/backend-compliance/compliance/services/cra_prioritizer.py @@ -0,0 +1,99 @@ +"""Coarse, deterministic CRA finding prioritisation. + +Two tiers (the customer's developers re-sort in Jira anyway — we only pre-sort): + - P0 floor (NON-negotiable, customer weights cannot demote): a finding that can + defeat a CE safety function (personal harm), is actively exploited, or is + CRITICAL severity. + - Discretionary: ranked by severity x the customer's weight for the business + objective the finding belongs to. +Plus a Quick-Win flag (high impact, low effort) for a second view. + +No DB/LLM. Operates on mapped-finding objects by attribute (duck-typed) to avoid +a circular import with the mapper. Every priority carries a short reason. +""" +from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS, SEVERITY_WEIGHT + +_REQ = {r["req_id"]: r for r in ANNEX_I_REQUIREMENTS} + +# 8 CRA-AI categories -> 5 business objectives the customer can weight. +_CATEGORY_TO_OBJECTIVE = { + "Secure-by-Design": "network_api", + "Authentifizierung": "access", + "Kryptografie": "data", + "SSDLC": "supply_updates", + "Supply Chain": "supply_updates", + "Updates": "supply_updates", + "Logging": "monitoring", + "Vulnerability Handling": "monitoring", +} +OBJECTIVES = ["access", "data", "network_api", "supply_updates", "monitoring"] +OBJECTIVE_LABELS = { + "access": "Zugang/Authentifizierung", + "data": "Datenvertraulichkeit", + "network_api": "Netzwerk/API", + "supply_updates": "Updates/Supply-Chain", + "monitoring": "Monitoring/Incident", +} +_WEIGHT_NUM = {"high": 3, "medium": 2, "low": 1} +_WEIGHT_LABEL = {"high": "hoch", "medium": "mittel", "low": "niedrig"} +_QUICK_WIN_MAX_EFFORT = 3 +_TIER_RANK = {"P0": 0, "P1": 1, "P2": 2, "P3": 3} + + +def objective_for(req_id: str) -> str: + req = _REQ.get(req_id) + if not req: + return "network_api" + return _CATEGORY_TO_OBJECTIVE.get(req.get("category", ""), "network_api") + + +def _reason(p0, safety, exploited, sev, objective, weight_key, quick_win, tier): + if p0: + if safety: + return "P0 — kann eine Sicherheitsfunktion der CE-Risikobeurteilung aushebeln (Personenschaden)" + if exploited: + return "P0 — aktiv ausnutzbar / oeffentlich bekannt" + return "P0 — kritische Schwere" + obj = OBJECTIVE_LABELS.get(objective, objective) + base = "{} — Schwere {} + Prioritaet '{}' ({})".format( + tier, sev, obj, _WEIGHT_LABEL.get(weight_key, "mittel")) + if quick_win: + return base + " · Quick Win (geringer Aufwand)" + return base + + +def prioritize(mapped: list, weights=None) -> list: + """Set priority fields on each mapped finding and return them sorted. + + mapped: objects with attrs primary_requirement, risk_level, safety_impact, + exploited. weights: {objective: 'high'|'medium'|'low'}. + """ + weights = weights or {} + for m in mapped: + rid = getattr(m, "primary_requirement", "") + req = _REQ.get(rid, {}) + objective = objective_for(rid) + sev = (getattr(m, "risk_level", "LOW") or "LOW").upper() + sev_w = SEVERITY_WEIGHT.get(sev, 10) + weight_key = str(weights.get(objective, "medium")).lower() + w = _WEIGHT_NUM.get(weight_key, 2) + safety = bool(getattr(m, "safety_impact", False)) + exploited = bool(getattr(m, "exploited", False)) + effort = req.get("effort_days", 99) + p0 = safety or exploited or sev == "CRITICAL" + score = sev_w * w + quick_win = (not p0) and effort <= _QUICK_WIN_MAX_EFFORT and sev in ("HIGH", "CRITICAL") + if p0: + tier = "P0" + elif score >= 150: + tier = "P1" + elif score >= 60: + tier = "P2" + else: + tier = "P3" + m.objective = objective + m.priority_tier = tier + m.priority_score = score + m.quick_win = quick_win + m.priority_reason = _reason(p0, safety, exploited, sev, objective, weight_key, quick_win, tier) + return sorted(mapped, key=lambda x: (_TIER_RANK.get(x.priority_tier, 9), -x.priority_score)) diff --git a/backend-compliance/tests/test_cra_assess_routes.py b/backend-compliance/tests/test_cra_assess_routes.py index 40517009..786dd3ea 100644 --- a/backend-compliance/tests/test_cra_assess_routes.py +++ b/backend-compliance/tests/test_cra_assess_routes.py @@ -38,3 +38,26 @@ def test_assess_requires_finding_id(): # id is required by the schema -> 422 r = client.post("/api/v1/cra/assess", json={"findings": [{"title": "no id"}]}) assert r.status_code == 422 + + +def test_assess_prioritizes_with_weights(): + r = client.post("/api/v1/cra/assess", json={ + "findings": [ + {"id": "mfa", "cwe": "CWE-306", "severity": "high"}, + {"id": "log", "cwe": "CWE-778", "severity": "high"}, + ], + "weights": {"access": "high", "monitoring": "low"}, + }) + assert r.status_code == 200 + d = r.json() + order = [m["finding_id"] for m in d["mapped"]] + assert order.index("mfa") < order.index("log") + assert all("priority_tier" in m for m in d["mapped"]) + + +def test_assess_p0_floor_on_safety_impact(): + r = client.post("/api/v1/cra/assess", json={"findings": [ + {"id": "s", "cwe": "CWE-319", "severity": "low", "safety_impact": True}, + ]}) + assert r.status_code == 200 + assert r.json()["mapped"][0]["priority_tier"] == "P0" diff --git a/backend-compliance/tests/test_cra_finding_mapper.py b/backend-compliance/tests/test_cra_finding_mapper.py index c091f987..47de765a 100644 --- a/backend-compliance/tests/test_cra_finding_mapper.py +++ b/backend-compliance/tests/test_cra_finding_mapper.py @@ -77,7 +77,8 @@ def test_payload_entry_is_json_serializable_and_deterministic(): assert r1 == r2 # deterministic assert r1["findings_total"] == 2 assert isinstance(r1["mapped"], list) and isinstance(r1["mapped"][0], dict) - assert r1["mapped"][0]["primary_requirement"] == "CRA-AI-9" + by_id = {m["finding_id"]: m for m in r1["mapped"]} # order is now priority-sorted + assert by_id["x"]["primary_requirement"] == "CRA-AI-9" def test_empty_payload_is_safe(): diff --git a/backend-compliance/tests/test_cra_prioritizer.py b/backend-compliance/tests/test_cra_prioritizer.py new file mode 100644 index 00000000..8dc7a97b --- /dev/null +++ b/backend-compliance/tests/test_cra_prioritizer.py @@ -0,0 +1,52 @@ +"""Tests for the coarse CRA prioritisation (P0 floor + weighted tier + quick wins).""" +from compliance.services.cra_finding_mapper import ScannerFinding, assess_findings + + +def test_safety_impact_forces_p0(): + a = assess_findings([ScannerFinding(id="s", title="TLS 1.0", cwe="CWE-319", severity="medium", safety_impact=True)]) + m = a.mapped[0] + assert m.priority_tier == "P0" + assert "Personenschaden" in m.priority_reason + + +def test_exploited_forces_p0(): + a = assess_findings([ScannerFinding(id="e", title="outdated dep", category="dependency", severity="medium", exploited=True)]) + assert a.mapped[0].priority_tier == "P0" + + +def test_critical_is_p0(): + a = assess_findings([ScannerFinding(id="c", title="default password", cwe="CWE-259", severity="critical")]) + assert a.mapped[0].priority_tier == "P0" + + +def test_weights_order_the_discretionary_tier(): + findings = [ + ScannerFinding(id="log", title="no security logging", cwe="CWE-778", severity="high"), # monitoring + ScannerFinding(id="mfa", title="missing authentication", cwe="CWE-306", severity="high"), # access + ] + a = assess_findings(findings, weights={"access": "high", "monitoring": "low"}) + order = [m.finding_id for m in a.mapped] + assert order.index("mfa") < order.index("log") + assert a.mapped[0].priority_tier != "P0" # neither is a floor finding + + +def test_quick_win_flag_and_view(): + a = assess_findings([ScannerFinding(id="tls", title="TLS 1.0", cwe="CWE-319", severity="high")]) + m = a.mapped[0] + assert m.primary_requirement == "CRA-AI-15" # effort 2 days + assert m.quick_win is True + assert "tls" in a.quick_wins + + +def test_p0_sorts_above_discretionary(): + findings = [ + ScannerFinding(id="low", title="missing logging", cwe="CWE-778", severity="low"), # P3 + ScannerFinding(id="crit", title="default password", cwe="CWE-259", severity="critical"), # P0 + ] + a = assess_findings(findings) + assert a.mapped[0].finding_id == "crit" + + +def test_objectives_exposed(): + a = assess_findings([]) + assert a.objectives == ["access", "data", "network_api", "supply_updates", "monitoring"]