e7c3cd7cee
- FastMCP transport_security: enable_dns_rebinding_protection nur an, wenn MCP_ALLOWED_HOSTS gesetzt; sonst aus (sonst HTTP 421 "Invalid Host header" bei Aufrufen über nginx/Container-Name). Bearer bleibt die Zugriffskontrolle. - bp-compliance-mcp: Host-Port-Mapping entfernt (8099 war von bp-core-health belegt) → expose-only im breakpilot-network, Routing via nginx (Folgeschritt). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
96 lines
3.5 KiB
Python
96 lines
3.5 KiB
Python
"""MCP server: the interface the external repo-scanner queries for CRA risk.
|
|
|
|
We are an MCP *server* exposing the deterministic CRA assessment. The scanner
|
|
(compliance-scanner-agent) registers us in its McpServerConfig registry and calls
|
|
``cra_assess_findings`` with the findings it already produced. All assessment
|
|
logic lives in compliance.services.cra_finding_mapper — this module is only MCP
|
|
transport glue.
|
|
|
|
Transports:
|
|
- stdio (default): ``python -m compliance.mcp.server``
|
|
- Streamable HTTP + Bearer: set ``MCP_PORT`` (the scanner uses HTTP transport).
|
|
Auth: if ``CRA_MCP_TOKEN`` is set, every request needs ``Authorization:
|
|
Bearer <token>``; if unset, the endpoint is open (local/stdio dev only).
|
|
"""
|
|
import json
|
|
import os
|
|
from typing import Optional
|
|
|
|
from mcp.server.fastmcp import FastMCP
|
|
from mcp.server.transport_security import TransportSecuritySettings
|
|
|
|
from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS
|
|
from compliance.services.cra_finding_mapper import assess_findings_payload
|
|
|
|
# We are a server-to-server, Bearer-gated API behind nginx — not a browser target.
|
|
# FastMCP's DNS-rebinding protection rejects unknown Host headers (HTTP 421); keep
|
|
# it OFF unless MCP_ALLOWED_HOSTS pins an explicit allowlist (comma-separated).
|
|
_ALLOWED = [h.strip() for h in (os.environ.get("MCP_ALLOWED_HOSTS") or "").split(",") if h.strip()]
|
|
_SECURITY = TransportSecuritySettings(
|
|
enable_dns_rebinding_protection=bool(_ALLOWED),
|
|
allowed_hosts=_ALLOWED,
|
|
allowed_origins=_ALLOWED,
|
|
)
|
|
|
|
mcp = FastMCP("breakpilot-cra", transport_security=_SECURITY)
|
|
|
|
|
|
@mcp.tool(
|
|
description=(
|
|
"Map repo-scanner findings to the CRA Annex I essential requirements they "
|
|
"violate, derive a risk level per finding, and return remediation measures "
|
|
"plus coverage. Deterministic; standalone (no project/FMEA needed). Each "
|
|
"finding accepts: id (required), title, description, category/scan_type, cwe, "
|
|
"severity (info|low|medium|high|critical), cvss/cvss_score, location/file_path."
|
|
)
|
|
)
|
|
async def cra_assess_findings(
|
|
findings: list,
|
|
weights: Optional[dict] = None,
|
|
safety_functions: Optional[list] = None,
|
|
) -> str:
|
|
payload = {
|
|
"findings": findings,
|
|
"weights": weights or {},
|
|
"safety_functions": safety_functions or [],
|
|
}
|
|
return json.dumps(assess_findings_payload(payload), ensure_ascii=False)
|
|
|
|
|
|
@mcp.tool(description="Return the 40 CRA Annex I essential requirements (the assessment spine).")
|
|
async def cra_list_requirements() -> str:
|
|
return json.dumps({"requirements": ANNEX_I_REQUIREMENTS}, ensure_ascii=False)
|
|
|
|
|
|
def _build_http_app():
|
|
"""Streamable-HTTP ASGI app with optional Bearer-token gate."""
|
|
from starlette.middleware.base import BaseHTTPMiddleware
|
|
from starlette.responses import JSONResponse
|
|
|
|
token = (os.environ.get("CRA_MCP_TOKEN") or "").strip()
|
|
|
|
class BearerAuth(BaseHTTPMiddleware):
|
|
async def dispatch(self, request, call_next):
|
|
if token:
|
|
auth = request.headers.get("authorization", "")
|
|
if auth != f"Bearer {token}":
|
|
return JSONResponse({"error": "unauthorized"}, status_code=401)
|
|
return await call_next(request)
|
|
|
|
app = mcp.streamable_http_app()
|
|
app.add_middleware(BearerAuth)
|
|
return app
|
|
|
|
|
|
def main() -> None:
|
|
port = os.environ.get("MCP_PORT")
|
|
if port:
|
|
import uvicorn
|
|
uvicorn.run(_build_http_app(), host="0.0.0.0", port=int(port))
|
|
else:
|
|
mcp.run() # stdio
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|