"""MCP server: the interface the external repo-scanner queries for CRA risk. We are the MCP *server*; the scanner is the client and asks us, in a targeted way, to turn its findings into a CRA (Cyber Resilience Act) risk assessment. All assessment logic lives in the deterministic, fully-tested compliance.services.cra_finding_mapper — this module is only the MCP transport glue (stdio). Run as: ``python -m compliance.mcp.server``. Transport note: stdio is the default. If the scanner needs HTTP/streamable transport instead, only the ``main()`` runner below changes. """ import asyncio import json from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent from compliance.services.cra_finding_mapper import assess_findings_payload from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS server = Server("breakpilot-cra") _FINDINGS_SCHEMA = { "type": "object", "properties": { "findings": { "type": "array", "description": "Findings the scanner already produced.", "items": { "type": "object", "properties": { "id": {"type": "string"}, "title": {"type": "string"}, "description": {"type": "string"}, "category": {"type": "string", "description": "e.g. crypto, auth, secrets, dependency"}, "cwe": {"type": "string", "description": "e.g. CWE-798"}, "severity": {"type": "string", "enum": ["critical", "high", "medium", "low"]}, "cvss": {"type": "number"}, "location": {"type": "string"}, }, "required": ["id"], }, } }, "required": ["findings"], } @server.list_tools() async def list_tools() -> list: return [ Tool( name="cra_assess_findings", description=( "Map repo-scanner findings to the CRA Annex I essential requirements they " "violate, derive a risk level per finding, and return the remediation measures " "plus coverage. Deterministic; works standalone (no project/FMEA needed)." ), inputSchema=_FINDINGS_SCHEMA, ), Tool( name="cra_list_requirements", description="Return the 40 CRA Annex I essential requirements (the assessment spine).", inputSchema={"type": "object", "properties": {}}, ), ] @server.call_tool() async def call_tool(name: str, arguments: dict) -> list: if name == "cra_assess_findings": result = assess_findings_payload(arguments or {}) elif name == "cra_list_requirements": result = {"requirements": ANNEX_I_REQUIREMENTS} else: raise ValueError("Unknown tool: {}".format(name)) return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False))] async def main() -> None: async with stdio_server() as (read_stream, write_stream): await server.run(read_stream, write_stream, server.create_initialization_options()) if __name__ == "__main__": asyncio.run(main())