feat(cra): standalone CRA finding->Annex I risk mapper + MCP interface

Deterministic mapper (no DB/LLM): repo-scanner findings -> the CRA Annex I
essential requirement(s) they violate -> risk level -> remediation measures +
coverage. Reuses the existing Annex I spine (cra_annex_i_data). The MCP server
(compliance/mcp/server.py, stdio) is the thin transport the external scanner
queries; all logic lives in the fully-tested mapper. Works standalone (no
project/FMEA required). No DB migrations.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-13 20:22:34 +02:00
parent 3489eaf8b0
commit a73b996381
5 changed files with 387 additions and 0 deletions
@@ -0,0 +1 @@
"""MCP Server for Compliance Document Checking."""
@@ -0,0 +1,87 @@
"""MCP server: the interface the external repo-scanner queries for CRA risk.
We are the MCP *server*; the scanner is the client and asks us, in a targeted
way, to turn its findings into a CRA (Cyber Resilience Act) risk assessment. All
assessment logic lives in the deterministic, fully-tested
compliance.services.cra_finding_mapper — this module is only the MCP transport
glue (stdio). Run as: ``python -m compliance.mcp.server``.
Transport note: stdio is the default. If the scanner needs HTTP/streamable
transport instead, only the ``main()`` runner below changes.
"""
import asyncio
import json
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from compliance.services.cra_finding_mapper import assess_findings_payload
from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS
server = Server("breakpilot-cra")
_FINDINGS_SCHEMA = {
"type": "object",
"properties": {
"findings": {
"type": "array",
"description": "Findings the scanner already produced.",
"items": {
"type": "object",
"properties": {
"id": {"type": "string"},
"title": {"type": "string"},
"description": {"type": "string"},
"category": {"type": "string", "description": "e.g. crypto, auth, secrets, dependency"},
"cwe": {"type": "string", "description": "e.g. CWE-798"},
"severity": {"type": "string", "enum": ["critical", "high", "medium", "low"]},
"cvss": {"type": "number"},
"location": {"type": "string"},
},
"required": ["id"],
},
}
},
"required": ["findings"],
}
@server.list_tools()
async def list_tools() -> list:
return [
Tool(
name="cra_assess_findings",
description=(
"Map repo-scanner findings to the CRA Annex I essential requirements they "
"violate, derive a risk level per finding, and return the remediation measures "
"plus coverage. Deterministic; works standalone (no project/FMEA needed)."
),
inputSchema=_FINDINGS_SCHEMA,
),
Tool(
name="cra_list_requirements",
description="Return the 40 CRA Annex I essential requirements (the assessment spine).",
inputSchema={"type": "object", "properties": {}},
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list:
if name == "cra_assess_findings":
result = assess_findings_payload(arguments or {})
elif name == "cra_list_requirements":
result = {"requirements": ANNEX_I_REQUIREMENTS}
else:
raise ValueError("Unknown tool: {}".format(name))
return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False))]
async def main() -> None:
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
if __name__ == "__main__":
asyncio.run(main())