"""MCP server: the interface the external repo-scanner queries for CRA risk. We are an MCP *server* exposing the deterministic CRA assessment. The scanner (compliance-scanner-agent) registers us in its McpServerConfig registry and calls ``cra_assess_findings`` with the findings it already produced. All assessment logic lives in compliance.services.cra_finding_mapper — this module is only MCP transport glue. Transports: - stdio (default): ``python -m compliance.mcp.server`` - Streamable HTTP + Bearer: set ``MCP_PORT`` (the scanner uses HTTP transport). Auth: if ``CRA_MCP_TOKEN`` is set, every request needs ``Authorization: Bearer ``; if unset, the endpoint is open (local/stdio dev only). """ import json import os from typing import Optional from mcp.server.fastmcp import FastMCP from mcp.server.transport_security import TransportSecuritySettings from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS from compliance.services.cra_finding_mapper import assess_findings_payload # We are a server-to-server, Bearer-gated API behind nginx — not a browser target. # FastMCP's DNS-rebinding protection rejects unknown Host headers (HTTP 421); keep # it OFF unless MCP_ALLOWED_HOSTS pins an explicit allowlist (comma-separated). _ALLOWED = [h.strip() for h in (os.environ.get("MCP_ALLOWED_HOSTS") or "").split(",") if h.strip()] _SECURITY = TransportSecuritySettings( enable_dns_rebinding_protection=bool(_ALLOWED), allowed_hosts=_ALLOWED, allowed_origins=_ALLOWED, ) mcp = FastMCP("breakpilot-cra", transport_security=_SECURITY) @mcp.tool( description=( "Map repo-scanner findings to the CRA Annex I essential requirements they " "violate, derive a risk level per finding, and return remediation measures " "plus coverage. Deterministic; standalone (no project/FMEA needed). Each " "finding accepts: id (required), title, description, category/scan_type, cwe, " "severity (info|low|medium|high|critical), cvss/cvss_score, location/file_path." ) ) async def cra_assess_findings( findings: list, weights: Optional[dict] = None, safety_functions: Optional[list] = None, ) -> str: payload = { "findings": findings, "weights": weights or {}, "safety_functions": safety_functions or [], } return json.dumps(assess_findings_payload(payload), ensure_ascii=False) @mcp.tool(description="Return the 40 CRA Annex I essential requirements (the assessment spine).") async def cra_list_requirements() -> str: return json.dumps({"requirements": ANNEX_I_REQUIREMENTS}, ensure_ascii=False) def _build_http_app(): """Streamable-HTTP ASGI app with optional Bearer-token gate.""" from starlette.middleware.base import BaseHTTPMiddleware from starlette.responses import JSONResponse token = (os.environ.get("CRA_MCP_TOKEN") or "").strip() class BearerAuth(BaseHTTPMiddleware): async def dispatch(self, request, call_next): if token: auth = request.headers.get("authorization", "") if auth != f"Bearer {token}": return JSONResponse({"error": "unauthorized"}, status_code=401) return await call_next(request) app = mcp.streamable_http_app() app.add_middleware(BearerAuth) return app def main() -> None: port = os.environ.get("MCP_PORT") if port: import uvicorn uvicorn.run(_build_http_app(), host="0.0.0.0", port=int(port)) else: mcp.run() # stdio if __name__ == "__main__": main()