"""Project-progress rollup for the CRA cyber findings (management view). The dev workflow is: finding -> measure -> ticket (Jira) sent to engineering. The scanner is the system of record for each finding's lifecycle: it carries ``status`` (open|triaged|resolved|false_positive|ignored) and, once a ticket is created, ``tracker_issue_url``. We read that lifecycle back here and roll it up into a management progress picture (% done, what's left, per-CRA-requirement coverage) WITHOUT re-scanning. Pure + deterministic: takes the raw scanner finding dicts, reuses the tested mapper for the requirement/risk axis, and reads the status/ticket axis straight off the raw dict. No DB, no LLM, no network. """ from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS from compliance.services.cra_finding_mapper import ScannerFinding, map_finding _REQ_TITLE = {r["req_id"]: r.get("title", "") for r in ANNEX_I_REQUIREMENTS} _RISK_RANK = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1} _RANK_RISK = {4: "CRITICAL", 3: "HIGH", 2: "MEDIUM", 1: "LOW", 0: ""} # 4 management phases the headline bar is built from. Raw scanner status is kept # separately (by_status) so nothing is hidden. OFFEN, IN_ARBEIT, ERLEDIGT, AUSGESCHLOSSEN = "offen", "in_arbeit", "erledigt", "ausgeschlossen" _PHASE_LABEL = { OFFEN: "Offen", IN_ARBEIT: "In Arbeit", ERLEDIGT: "Erledigt", AUSGESCHLOSSEN: "Ausgeschlossen", } _OPEN_PHASES = (OFFEN, IN_ARBEIT) def _phase(status: str, has_ticket: bool) -> str: """Derive the management phase. A finding only counts as 'in Arbeit' once a tracker ticket exists; 'erledigt' when the scanner reports it resolved.""" s = (status or "open").lower() if s == "resolved": return ERLEDIGT if s in ("false_positive", "ignored"): return AUSGESCHLOSSEN if has_ticket: return IN_ARBEIT return OFFEN def _new_req_agg(req_id: str, title: str) -> dict: return {"req_id": req_id, "title": title, "total": 0, OFFEN: 0, IN_ARBEIT: 0, ERLEDIGT: 0, AUSGESCHLOSSEN: 0, "_max_open_risk": 0} def build_progress(raw_findings: list) -> dict: """Roll raw scanner findings up into a management progress picture.""" themes: list = [] by_phase = {OFFEN: 0, IN_ARBEIT: 0, ERLEDIGT: 0, AUSGESCHLOSSEN: 0} by_status: dict = {} by_risk_open = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0} req_agg: dict = {} for d in raw_findings: if not isinstance(d, dict): continue sf = ScannerFinding.from_dict(d) mf = map_finding(sf) status = (d.get("status") or "open").lower() tracker = d.get("tracker_issue_url") or "" has_ticket = bool(tracker) phase = _phase(status, has_ticket) by_status[status] = by_status.get(status, 0) + 1 by_phase[phase] += 1 if phase in _OPEN_PHASES and mf.risk_level in by_risk_open: by_risk_open[mf.risk_level] += 1 req = mf.primary_requirement or "—" rt = _REQ_TITLE.get(req, "") themes.append({ "finding_id": mf.finding_id, "title": mf.title or sf.title or mf.finding_id, "requirement": req, "requirement_title": rt, "risk_level": mf.risk_level, "phase": phase, "phase_label": _PHASE_LABEL[phase], "status": status, "tracker_url": tracker, "has_ticket": has_ticket, "location": mf.location, "updated_at": d.get("updated_at") or "", }) agg = req_agg.setdefault(req, _new_req_agg(req, rt)) agg["total"] += 1 agg[phase] += 1 if phase in _OPEN_PHASES: agg["_max_open_risk"] = max(agg["_max_open_risk"], _RISK_RANK.get(mf.risk_level, 0)) total = len(themes) actionable = by_phase[OFFEN] + by_phase[IN_ARBEIT] + by_phase[ERLEDIGT] completion_pct = round(100.0 * by_phase[ERLEDIGT] / actionable, 1) if actionable else 0.0 requirements = [] for agg in req_agg.values(): act = agg[OFFEN] + agg[IN_ARBEIT] + agg[ERLEDIGT] if act > 0 and agg[ERLEDIGT] == act: rphase = ERLEDIGT elif agg[IN_ARBEIT] > 0 or agg[ERLEDIGT] > 0: rphase = IN_ARBEIT else: rphase = OFFEN requirements.append({ "req_id": agg["req_id"], "title": agg["title"], "total": agg["total"], OFFEN: agg[OFFEN], IN_ARBEIT: agg[IN_ARBEIT], ERLEDIGT: agg[ERLEDIGT], AUSGESCHLOSSEN: agg[AUSGESCHLOSSEN], "phase": rphase, "phase_label": _PHASE_LABEL[rphase], "completion_pct": round(100.0 * agg[ERLEDIGT] / act, 1) if act else 0.0, "open_risk": _RANK_RISK[agg["_max_open_risk"]], }) requirements.sort(key=lambda r: (-_RISK_RANK.get(r["open_risk"], 0), r["completion_pct"])) # Surface the riskiest open work first in the theme table. themes.sort(key=lambda t: ( 0 if t["phase"] in _OPEN_PHASES else 1, -_RISK_RANK.get(t["risk_level"], 0), )) return { "total": total, "actionable": actionable, "completion_pct": completion_pct, "by_phase": by_phase, "by_status": by_status, "by_risk_open": by_risk_open, "open_count": by_phase[OFFEN] + by_phase[IN_ARBEIT], "requirements": requirements, "themes": themes, }