From a73b99638154d63bdd9bcca319d9e6d4a58cc232 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Sat, 13 Jun 2026 20:22:34 +0200 Subject: [PATCH] feat(cra): standalone CRA finding->Annex I risk mapper + MCP interface Deterministic mapper (no DB/LLM): repo-scanner findings -> the CRA Annex I essential requirement(s) they violate -> risk level -> remediation measures + coverage. Reuses the existing Annex I spine (cra_annex_i_data). The MCP server (compliance/mcp/server.py, stdio) is the thin transport the external scanner queries; all logic lives in the fully-tested mapper. Works standalone (no project/FMEA required). No DB migrations. Co-Authored-By: Claude Opus 4.7 --- backend-compliance/compliance/mcp/__init__.py | 1 + backend-compliance/compliance/mcp/server.py | 87 +++++++ .../compliance/services/cra_finding_mapper.py | 212 ++++++++++++++++++ backend-compliance/requirements.txt | 3 + .../tests/test_cra_finding_mapper.py | 84 +++++++ 5 files changed, 387 insertions(+) create mode 100644 backend-compliance/compliance/mcp/__init__.py create mode 100644 backend-compliance/compliance/mcp/server.py create mode 100644 backend-compliance/compliance/services/cra_finding_mapper.py create mode 100644 backend-compliance/tests/test_cra_finding_mapper.py diff --git a/backend-compliance/compliance/mcp/__init__.py b/backend-compliance/compliance/mcp/__init__.py new file mode 100644 index 00000000..a6407d1b --- /dev/null +++ b/backend-compliance/compliance/mcp/__init__.py @@ -0,0 +1 @@ +"""MCP Server for Compliance Document Checking.""" diff --git a/backend-compliance/compliance/mcp/server.py b/backend-compliance/compliance/mcp/server.py new file mode 100644 index 00000000..9c9c5e6e --- /dev/null +++ b/backend-compliance/compliance/mcp/server.py @@ -0,0 +1,87 @@ +"""MCP server: the interface the external repo-scanner queries for CRA risk. + +We are the MCP *server*; the scanner is the client and asks us, in a targeted +way, to turn its findings into a CRA (Cyber Resilience Act) risk assessment. All +assessment logic lives in the deterministic, fully-tested +compliance.services.cra_finding_mapper — this module is only the MCP transport +glue (stdio). Run as: ``python -m compliance.mcp.server``. + +Transport note: stdio is the default. If the scanner needs HTTP/streamable +transport instead, only the ``main()`` runner below changes. +""" +import asyncio +import json + +from mcp.server import Server +from mcp.server.stdio import stdio_server +from mcp.types import Tool, TextContent + +from compliance.services.cra_finding_mapper import assess_findings_payload +from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS + +server = Server("breakpilot-cra") + +_FINDINGS_SCHEMA = { + "type": "object", + "properties": { + "findings": { + "type": "array", + "description": "Findings the scanner already produced.", + "items": { + "type": "object", + "properties": { + "id": {"type": "string"}, + "title": {"type": "string"}, + "description": {"type": "string"}, + "category": {"type": "string", "description": "e.g. crypto, auth, secrets, dependency"}, + "cwe": {"type": "string", "description": "e.g. CWE-798"}, + "severity": {"type": "string", "enum": ["critical", "high", "medium", "low"]}, + "cvss": {"type": "number"}, + "location": {"type": "string"}, + }, + "required": ["id"], + }, + } + }, + "required": ["findings"], +} + + +@server.list_tools() +async def list_tools() -> list: + return [ + Tool( + name="cra_assess_findings", + description=( + "Map repo-scanner findings to the CRA Annex I essential requirements they " + "violate, derive a risk level per finding, and return the remediation measures " + "plus coverage. Deterministic; works standalone (no project/FMEA needed)." + ), + inputSchema=_FINDINGS_SCHEMA, + ), + Tool( + name="cra_list_requirements", + description="Return the 40 CRA Annex I essential requirements (the assessment spine).", + inputSchema={"type": "object", "properties": {}}, + ), + ] + + +@server.call_tool() +async def call_tool(name: str, arguments: dict) -> list: + if name == "cra_assess_findings": + result = assess_findings_payload(arguments or {}) + elif name == "cra_list_requirements": + result = {"requirements": ANNEX_I_REQUIREMENTS} + else: + raise ValueError("Unknown tool: {}".format(name)) + return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False))] + + +async def main() -> None: + async with stdio_server() as (read_stream, write_stream): + await server.run(read_stream, write_stream, server.create_initialization_options()) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/backend-compliance/compliance/services/cra_finding_mapper.py b/backend-compliance/compliance/services/cra_finding_mapper.py new file mode 100644 index 00000000..3985fbae --- /dev/null +++ b/backend-compliance/compliance/services/cra_finding_mapper.py @@ -0,0 +1,212 @@ +"""Deterministic mapper: scanner finding -> CRA Annex I requirement -> risk + measures. + +This is the brain of the standalone CRA cyber risk assessment (the layer the +external repo-scanner queries via MCP). It takes findings the scanner already +produced (we do NOT re-scan, and we do NOT duplicate the CVE/NVD knowledge the +scanner owns) and maps each to the CRA Annex I essential requirement(s) it +violates, derives a risk level, and attaches the remediation measures. + +Pure + deterministic: no DB, no LLM, no network. Same input -> same output. +The requirement/measure spine is the single source of truth in +compliance.api.cra_annex_i_data (pure data, no FastAPI dependency). +""" +from dataclasses import dataclass, field, asdict +from typing import Optional + +from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS, MEASURES, DEADLINES + +_REQ_INDEX = {r["req_id"]: r for r in ANNEX_I_REQUIREMENTS} +_SEV_ORDER = {"LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4} +_SEV_BY_RANK = {v: k for k, v in _SEV_ORDER.items()} + +# High-confidence CWE -> CRA-AI requirement(s), primary first. Keyed by the bare +# CWE number. Only req_ids that exist in the spine are referenced. +_CWE_TO_REQ = { + 798: ["CRA-AI-9", "CRA-AI-8"], 259: ["CRA-AI-8"], 1392: ["CRA-AI-8"], 521: ["CRA-AI-8"], + 287: ["CRA-AI-7"], 306: ["CRA-AI-7"], 304: ["CRA-AI-7"], + 862: ["CRA-AI-12"], 863: ["CRA-AI-12"], + 732: ["CRA-AI-4"], 269: ["CRA-AI-4"], 250: ["CRA-AI-4"], + 327: ["CRA-AI-13"], 326: ["CRA-AI-13"], 328: ["CRA-AI-13"], 916: ["CRA-AI-9"], + 319: ["CRA-AI-15"], 311: ["CRA-AI-15"], 312: ["CRA-AI-14"], + 522: ["CRA-AI-9"], 320: ["CRA-AI-16"], 200: ["CRA-AI-13"], + 1188: ["CRA-AI-1"], 16: ["CRA-AI-1"], + 1004: ["CRA-AI-10"], 614: ["CRA-AI-10"], 384: ["CRA-AI-10"], 613: ["CRA-AI-10"], 352: ["CRA-AI-10"], + 307: ["CRA-AI-11"], + 345: ["CRA-AI-6"], 353: ["CRA-AI-6"], 494: ["CRA-AI-30"], + 778: ["CRA-AI-24"], 532: ["CRA-AI-24"], 117: ["CRA-AI-27"], + 1104: ["CRA-AI-22"], 1035: ["CRA-AI-22"], 937: ["CRA-AI-22"], 1395: ["CRA-AI-22"], + 79: ["CRA-AI-20"], 89: ["CRA-AI-20"], 77: ["CRA-AI-20"], 78: ["CRA-AI-20"], 22: ["CRA-AI-20"], 20: ["CRA-AI-20"], +} + +# Substring fallback (lowercase) against category + title + description, primary first. +_KEYWORD_TO_REQ = [ + ("default password", "CRA-AI-8"), ("hardcoded", "CRA-AI-9"), ("secret", "CRA-AI-9"), + ("credential", "CRA-AI-9"), ("password", "CRA-AI-8"), ("mfa", "CRA-AI-7"), + ("authentication", "CRA-AI-7"), ("authoriz", "CRA-AI-12"), ("access control", "CRA-AI-12"), + ("privilege", "CRA-AI-4"), ("tls", "CRA-AI-15"), ("ssl", "CRA-AI-15"), ("cipher", "CRA-AI-13"), + ("crypto", "CRA-AI-13"), ("encrypt", "CRA-AI-13"), ("cleartext", "CRA-AI-15"), ("at rest", "CRA-AI-14"), + ("session", "CRA-AI-10"), ("cookie", "CRA-AI-10"), ("csrf", "CRA-AI-10"), ("brute", "CRA-AI-11"), + ("rate limit", "CRA-AI-11"), ("sbom", "CRA-AI-23"), ("dependency", "CRA-AI-22"), + ("outdated", "CRA-AI-22"), ("known vuln", "CRA-AI-22"), ("cve", "CRA-AI-22"), + ("injection", "CRA-AI-20"), ("xss", "CRA-AI-20"), ("sql", "CRA-AI-20"), ("traversal", "CRA-AI-20"), + ("logging", "CRA-AI-24"), ("update", "CRA-AI-28"), ("signature", "CRA-AI-29"), + ("integrity", "CRA-AI-6"), ("debug", "CRA-AI-1"), ("config", "CRA-AI-1"), +] + + +@dataclass +class ScannerFinding: + """One finding emitted by the external repo-scanner.""" + id: str + title: str = "" + description: str = "" + category: str = "" + cwe: str = "" + severity: str = "" # critical | high | medium | low (scanner's rating) + cvss: Optional[float] = None + location: str = "" + + @classmethod + def from_dict(cls, d: dict) -> "ScannerFinding": + return cls( + id=str(d.get("id") or d.get("finding_id") or ""), + title=d.get("title", "") or d.get("name", ""), + description=d.get("description", "") or d.get("detail", ""), + category=d.get("category", "") or d.get("type", ""), + cwe=str(d.get("cwe", "") or ""), + severity=d.get("severity", "") or "", + cvss=d.get("cvss"), + location=d.get("location", "") or d.get("path", ""), + ) + + +@dataclass +class MappedFinding: + finding_id: str + requirement_ids: list = field(default_factory=list) + primary_requirement: str = "" + annex_anchor: str = "" + iso27001_ref: list = field(default_factory=list) + risk_level: str = "LOW" + measures: list = field(default_factory=list) + rationale: str = "" + unmapped: bool = False + + +@dataclass +class CRAAssessment: + findings_total: int + mapped: list = field(default_factory=list) + by_risk: dict = field(default_factory=dict) + requirements_touched: list = field(default_factory=list) + open_measures: list = field(default_factory=list) # [{id, description}] + unmapped_findings: list = field(default_factory=list) + coverage_pct: float = 0.0 + deadlines: list = field(default_factory=lambda: list(DEADLINES)) + + +def _cwe_num(cwe: str) -> Optional[int]: + digits = "".join(ch for ch in str(cwe) if ch.isdigit()) + return int(digits) if digits else None + + +def _sev_from_cvss(cvss: Optional[float]) -> str: + if cvss is None: + return "" + if cvss >= 9.0: + return "CRITICAL" + if cvss >= 7.0: + return "HIGH" + if cvss >= 4.0: + return "MEDIUM" + if cvss > 0: + return "LOW" + return "" + + +def _rank(sev: str) -> int: + return _SEV_ORDER.get((sev or "").upper(), 0) + + +def _candidate_reqs(f: ScannerFinding) -> list: + """Deterministic requirement candidates, primary first, deduped, existing only.""" + out: list = [] + num = _cwe_num(f.cwe) + if num in _CWE_TO_REQ: + out.extend(_CWE_TO_REQ[num]) + haystack = " ".join([f.category, f.title, f.description]).lower() + for kw, rid in _KEYWORD_TO_REQ: + if kw in haystack: + out.append(rid) + seen, result = set(), [] + for rid in out: + if rid in _REQ_INDEX and rid not in seen: + seen.add(rid) + result.append(rid) + return result + + +def map_finding(f: ScannerFinding) -> MappedFinding: + reqs = _candidate_reqs(f) + finding_sev = (f.severity or _sev_from_cvss(f.cvss)).upper() + if not reqs: + return MappedFinding( + finding_id=f.id, risk_level=_SEV_BY_RANK.get(_rank(finding_sev), "LOW"), + rationale="Kein eindeutiger CRA-Anforderungsbezug erkannt — manuelle Pruefung.", + unmapped=True, + ) + primary = _REQ_INDEX[reqs[0]] + risk_rank = max(_rank(finding_sev), _rank(primary["severity"])) + measures: list = [] + for rid in reqs: + for m in _REQ_INDEX[rid].get("mapped_measures", []): + if m not in measures: + measures.append(m) + return MappedFinding( + finding_id=f.id, + requirement_ids=reqs, + primary_requirement=primary["req_id"], + annex_anchor=primary.get("annex_anchor", ""), + iso27001_ref=list(primary.get("iso27001_ref", [])), + risk_level=_SEV_BY_RANK.get(risk_rank, "LOW"), + measures=measures, + rationale="{}: {}".format(primary["req_id"], primary.get("title", "")), + ) + + +def assess_findings(findings: list) -> CRAAssessment: + """Map a list of ScannerFinding into a deterministic CRA assessment.""" + mapped = [map_finding(f) for f in findings] + by_risk = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0} + reqs_touched, measure_ids, unmapped = set(), [], [] + for m in mapped: + by_risk[m.risk_level] = by_risk.get(m.risk_level, 0) + 1 + if m.unmapped: + unmapped.append(m.finding_id) + for rid in m.requirement_ids: + reqs_touched.add(rid) + for mid in m.measures: + if mid not in measure_ids: + measure_ids.append(mid) + total = len(findings) + covered = total - len(unmapped) + return CRAAssessment( + findings_total=total, + mapped=mapped, + by_risk=by_risk, + requirements_touched=sorted(reqs_touched), + open_measures=[{"id": mid, "description": MEASURES.get(mid, "")} for mid in measure_ids], + unmapped_findings=unmapped, + coverage_pct=round(100.0 * covered / total, 1) if total else 0.0, + ) + + +def assess_findings_payload(payload: dict) -> dict: + """MCP/HTTP entry: {"findings": [ {...}, ... ]} -> assessment dict. + + This is the testable tool body the MCP server wraps (kept transport-free). + """ + raw = payload.get("findings", []) if isinstance(payload, dict) else [] + findings = [ScannerFinding.from_dict(d) for d in raw] + assessment = assess_findings(findings) + return asdict(assessment) # recurses into nested MappedFinding dataclasses diff --git a/backend-compliance/requirements.txt b/backend-compliance/requirements.txt index 9f8584b5..bb6353f1 100644 --- a/backend-compliance/requirements.txt +++ b/backend-compliance/requirements.txt @@ -53,3 +53,6 @@ cryptography>=42.0.0 pillow>=12.1.1 python-docx==1.2.0 pytesseract>=0.3.13 + +# MCP server (CRA cyber risk-assessment interface queried by the repo-scanner). MIT. +mcp>=1.2.0 diff --git a/backend-compliance/tests/test_cra_finding_mapper.py b/backend-compliance/tests/test_cra_finding_mapper.py new file mode 100644 index 00000000..6ac0bca7 --- /dev/null +++ b/backend-compliance/tests/test_cra_finding_mapper.py @@ -0,0 +1,84 @@ +"""Tests for the deterministic CRA finding -> Annex I requirement mapper.""" +from compliance.services.cra_finding_mapper import ( + ScannerFinding, map_finding, assess_findings, assess_findings_payload, +) + + +def test_hardcoded_credentials_cwe_maps_to_credential_requirement(): + m = map_finding(ScannerFinding(id="f1", title="Hardcoded API key", cwe="CWE-798", severity="high")) + assert m.primary_requirement == "CRA-AI-9" + assert "CRA-AI-8" in m.requirement_ids + assert not m.unmapped + assert m.annex_anchor # spine carries the Annex anchor + + +def test_default_password_is_critical_and_carries_measure_M542(): + m = map_finding(ScannerFinding(id="f2", title="Universal default password", cwe="CWE-259", severity="critical")) + assert m.primary_requirement == "CRA-AI-8" + assert m.risk_level == "CRITICAL" + assert "M542" in m.measures # default-password change measure from the spine + + +def test_weak_tls_via_keyword_maps_to_transport_crypto(): + m = map_finding(ScannerFinding(id="f3", title="TLS 1.0 enabled", description="weak protocol", severity="high")) + assert m.primary_requirement == "CRA-AI-15" + assert not m.unmapped + + +def test_dependency_cve_without_cwe_maps_to_dependency_monitoring(): + m = map_finding(ScannerFinding(id="f4", title="lodash 4.17.4 has known CVE", category="dependency", severity="high")) + assert m.primary_requirement == "CRA-AI-22" + + +def test_severity_derived_from_cvss_when_missing(): + m = map_finding(ScannerFinding(id="f5", title="cleartext transmission", cwe="CWE-319", cvss=9.4)) + # finding sev (CRITICAL from cvss) escalates over requirement sev + assert m.risk_level == "CRITICAL" + + +def test_risk_is_max_of_finding_and_requirement_severity(): + # low-severity finding but the requirement (CRA-AI-8, CRITICAL) dominates + m = map_finding(ScannerFinding(id="f6", title="default password", severity="low")) + assert m.primary_requirement == "CRA-AI-8" + assert m.risk_level == "CRITICAL" + + +def test_unmapped_finding_is_flagged_not_invented(): + m = map_finding(ScannerFinding(id="f7", title="zzz unrelated note", severity="low")) + assert m.unmapped + assert m.requirement_ids == [] + + +def test_assessment_aggregates_and_coverage(): + findings = [ + ScannerFinding(id="a", cwe="CWE-259", severity="critical"), # CRA-AI-8 + ScannerFinding(id="b", title="TLS 1.0", severity="high"), # CRA-AI-15 + ScannerFinding(id="c", title="zzz nothing", severity="low"), # unmapped + ] + a = assess_findings(findings) + assert a.findings_total == 3 + assert sum(a.by_risk.values()) == 3 + assert "CRA-AI-8" in a.requirements_touched + assert "c" in a.unmapped_findings + assert a.coverage_pct == round(100.0 * 2 / 3, 1) + assert any(meas["id"] == "M542" for meas in a.open_measures) + assert all("description" in meas for meas in a.open_measures) + + +def test_payload_entry_is_json_serializable_and_deterministic(): + payload = {"findings": [ + {"id": "x", "cwe": "CWE-798", "severity": "high"}, + {"id": "y", "type": "dependency", "name": "openssl CVE-2024-x", "severity": "critical"}, + ]} + r1 = assess_findings_payload(payload) + r2 = assess_findings_payload(payload) + assert r1 == r2 # deterministic + assert r1["findings_total"] == 2 + assert isinstance(r1["mapped"], list) and isinstance(r1["mapped"][0], dict) + assert r1["mapped"][0]["primary_requirement"] == "CRA-AI-9" + + +def test_empty_payload_is_safe(): + r = assess_findings_payload({}) + assert r["findings_total"] == 0 + assert r["coverage_pct"] == 0.0