"""Management progress rollup over scanner findings (status/ticket readback).""" from compliance.services.cra_progress import build_progress def _f(fid, cwe, status="open", tracker=None, severity="high"): d = {"id": fid, "cwe": cwe, "severity": severity, "status": status} if tracker: d["tracker_issue_url"] = tracker return d class TestBuildProgress: def test_empty(self): r = build_progress([]) assert r["total"] == 0 assert r["completion_pct"] == 0.0 assert r["themes"] == [] and r["requirements"] == [] def test_phase_derivation(self): findings = [ _f("a", "CWE-259", status="open"), # offen _f("b", "CWE-319", status="triaged"), # offen (no ticket) _f("c", "CWE-494", status="triaged", tracker="http://jira/1"), # in_arbeit _f("d", "CWE-778", status="resolved"), # erledigt _f("e", "CWE-1188", status="false_positive"), # ausgeschlossen _f("g", "CWE-1104", status="ignored"), # ausgeschlossen ] r = build_progress(findings) assert r["by_phase"] == {"offen": 2, "in_arbeit": 1, "erledigt": 1, "ausgeschlossen": 2} assert r["total"] == 6 # actionable excludes the 2 ausgeschlossen → 4; 1 done of 4 = 25% assert r["actionable"] == 4 assert r["completion_pct"] == 25.0 assert r["open_count"] == 3 # offen + in_arbeit def test_ticket_means_in_arbeit_even_when_open_status(self): r = build_progress([_f("a", "CWE-259", status="open", tracker="http://jira/9")]) assert r["by_phase"]["in_arbeit"] == 1 assert r["themes"][0]["has_ticket"] is True assert r["themes"][0]["tracker_url"] == "http://jira/9" def test_requirement_rollup_and_completion(self): # two findings on the same CRA requirement (CWE-259 → CRA-AI-8), one done findings = [ _f("a", "CWE-259", status="resolved"), _f("b", "CWE-259", status="open"), ] r = build_progress(findings) reqs = {x["req_id"]: x for x in r["requirements"]} assert "CRA-AI-8" in reqs ai8 = reqs["CRA-AI-8"] assert ai8["total"] == 2 and ai8["erledigt"] == 1 and ai8["offen"] == 1 assert ai8["completion_pct"] == 50.0 assert ai8["phase"] == "in_arbeit" def test_fully_resolved_requirement_is_erledigt(self): r = build_progress([_f("a", "CWE-259", status="resolved")]) ai8 = next(x for x in r["requirements"] if x["req_id"] == "CRA-AI-8") assert ai8["phase"] == "erledigt" assert ai8["completion_pct"] == 100.0 assert r["completion_pct"] == 100.0 def test_open_risk_breakdown_excludes_done_and_excluded(self): findings = [ _f("a", "CWE-259", status="open", severity="critical"), _f("b", "CWE-319", status="resolved", severity="high"), _f("c", "CWE-1188", status="false_positive", severity="high"), ] r = build_progress(findings) # only the open critical counts toward open risk assert r["by_risk_open"]["CRITICAL"] == 1 assert sum(r["by_risk_open"].values()) == 1 def test_tolerates_scanner_shape(self): # real scanner field names: scan_type, cvss_score, file_path, _id findings = [{ "_id": {"$oid": "abc"}, "cwe": "CWE-319", "severity": "high", "status": "triaged", "tracker_issue_url": None, "scan_type": "sast", "file_path": "src/x.py", "updated_at": "2026-06-15T10:00:00Z", }] r = build_progress(findings) assert r["total"] == 1 t = r["themes"][0] assert t["finding_id"] == "abc" assert t["phase"] == "offen" # triaged, no ticket assert t["location"] == "src/x.py" assert t["updated_at"] == "2026-06-15T10:00:00Z"