414496c31a
Register-Flow für compliance-scanner-agent (anderes Team, Rust): deren MCP-Client (McpServerConfig) erwartet Streamable HTTP + Bearer — unser MCP war stdio/ohne Auth. - server.py auf FastMCP umgestellt: Tools cra_assess_findings + cra_list_requirements, Dual-Transport (stdio default; Streamable HTTP wenn MCP_PORT gesetzt), Bearer-Gate via CRA_MCP_TOKEN. - ScannerFinding.from_dict tolerant für ihr Finding-Schema (_id/fingerprint, scan_type→category, cvss_score→cvss, file_path→location, severity info→low). - Eigenständiger docker-compose-Dienst bp-compliance-mcp (Port 8099, pure/kein DB, isoliert von der Haupt-API) + Hetzner-amd64-Override. - Tests: test_cra_scanner_adapter, test_mcp_server (Bearer-Gate + Tool-Registry). Pull-Flow (wir holen ihre Findings über ihren MCP) + öffentliches nginx-Routing folgen separat (brauchen ihren Endpoint/Token). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
64 lines
2.2 KiB
Python
64 lines
2.2 KiB
Python
"""The MCP finding adapter must accept the compliance-scanner-agent Finding shape.
|
|
|
|
Their Rust/Mongo Finding uses _id, scan_type, cvss_score, file_path and a 5-level
|
|
severity (incl. "info"). Our mapper consumes id, category, cvss, location and a
|
|
4-level severity. from_dict bridges that gap.
|
|
"""
|
|
from compliance.services.cra_finding_mapper import (
|
|
ScannerFinding,
|
|
assess_findings_payload,
|
|
)
|
|
|
|
|
|
def _scanner_finding(**over):
|
|
"""A finding shaped like compliance-scanner-agent emits it."""
|
|
base = {
|
|
"_id": "507f1f77bcf86cd799439011",
|
|
"repo_id": "r1",
|
|
"fingerprint": "fp-123",
|
|
"scanner": "semgrep",
|
|
"scan_type": "secret_detection",
|
|
"title": "Hardcoded credential in source",
|
|
"description": "A hardcoded password was found.",
|
|
"severity": "high",
|
|
"cwe": "CWE-798",
|
|
"cve": None,
|
|
"cvss_score": 8.2,
|
|
"file_path": "src/config.py",
|
|
"status": "open",
|
|
}
|
|
base.update(over)
|
|
return base
|
|
|
|
|
|
def test_from_dict_maps_scanner_field_names():
|
|
f = ScannerFinding.from_dict(_scanner_finding())
|
|
assert f.id == "507f1f77bcf86cd799439011" # from _id
|
|
assert f.category == "secret_detection" # from scan_type
|
|
assert f.cvss == 8.2 # from cvss_score
|
|
assert f.location == "src/config.py" # from file_path
|
|
assert f.cwe == "CWE-798"
|
|
assert f.severity == "high"
|
|
|
|
|
|
def test_from_dict_info_severity_becomes_low():
|
|
f = ScannerFinding.from_dict(_scanner_finding(severity="info"))
|
|
assert f.severity == "low"
|
|
|
|
|
|
def test_from_dict_mongo_extended_json_id():
|
|
f = ScannerFinding.from_dict(_scanner_finding(_id={"$oid": "abc123"}))
|
|
assert f.id == "abc123"
|
|
|
|
|
|
def test_scanner_finding_assesses_to_cra_requirement():
|
|
result = assess_findings_payload({"findings": [_scanner_finding()]})
|
|
mapped = result["mapped"]
|
|
assert len(mapped) == 1
|
|
m = mapped[0]
|
|
assert m["finding_id"] == "507f1f77bcf86cd799439011"
|
|
# CWE-798 -> CRA-AI-9/-8 (hardcoded credentials / no default passwords)
|
|
assert m["primary_requirement"].startswith("CRA-AI-")
|
|
assert m["requirement_ids"]
|
|
assert m["risk_level"] in {"LOW", "MEDIUM", "HIGH", "CRITICAL"}
|