Files
breakpilot-compliance/backend-compliance/tests/test_reasoning_engine.py
T
Benjamin Admin 5e5002c883 refactor(reasoning): enforce ClaimCoverage (Welt 1) vs ComplianceStatus (Welt 2) boundary [F1]
Architecture-validation finding: the implementation mode produced compliance-
flavored output ("teilweise erfüllt", "covered") from a mere customer claim,
blurring the line to the Execution layer. This is a design decision, not a text
fix — the reasoning layer judges only the customer's STATEMENT, never conformity.

- CoverageStatus -> ClaimCoverage; values are claim-relative + carry "potential":
  potentially_addresses / partially_addresses / does_not_address /
  insufficient_information.
- ImplementationAssessment -> ClaimObligationMapping (coverage_status ->
  claim_coverage); ImplementationResponse -> ImplementationReasoningResponse
  (assessments -> mappings, + explicit `disclaimer`); request renamed; engine
  entry assess_implementation -> reason_implementation_claim.
- Endpoint /reasoning/implementation-assessment -> /reasoning/implementation-reasoning.
- Summary/explanations reworded: "adressiert wahrscheinlich N Pflichten … für
  eine Bewertung der tatsächlichen Umsetzung sind Nachweise erforderlich (keine
  Konformitätsaussage)". No "erfüllt"/"abgedeckt" leaks.
- New guard test asserts no compliance verdict leaks (no "erfüllt"; disclaimer
  separates ClaimCoverage from ComplianceStatus). 23 tests green, mypy clean.

Discovery (scope/obligations) was already structurally claim-free and unaffected.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-26 00:37:57 +02:00

283 lines
12 KiB
Python

"""Tests for the Regulatory Reasoning Engine.
Covers the five typical machine-builder scenarios and the ten acceptance
questions from the build spec (§15). Engine tests are pure (no DB); the
endpoint smoke tests mount only the reasoning router.
"""
from __future__ import annotations
from datetime import date
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from compliance.reasoning import (
assess_interpretation,
derive_obligations,
discover_scope,
normalize_claim,
reason_implementation_claim,
)
from compliance.reasoning.enums import (
ApplicabilityStatus,
ClaimCoverage,
InterpretationVerdict,
)
from compliance.reasoning.schemas import ProductProfile
from compliance.reasoning.enums import ManufacturerRole
# ---------------------------------------------------------------------------
# Fixtures / builders
# ---------------------------------------------------------------------------
def sps_profile(**overrides) -> ProductProfile:
base = dict(
product_name="SPS mit HMI",
product_type=["SPS", "HMI", "Schaltschrank"],
has_software=True,
has_remote_access=True,
has_cloud_connection=True,
eu_market=True,
manufacturer_role=ManufacturerRole.MANUFACTURER,
)
base.update(overrides)
return ProductProfile(**base)
def _reg_ids(scope, attr):
return [getattr(r, "regulation_id") for r in getattr(scope, attr)]
# ---------------------------------------------------------------------------
# 1. Gilt CRA für eine SPS mit Fernwartung?
# ---------------------------------------------------------------------------
def test_cra_applies_to_sps_with_remote_access():
scope = discover_scope(sps_profile())
cra = [r for r in scope.applicable_regulations if r.regulation_id == "CRA"]
assert cra and cra[0].applicability_status == ApplicabilityStatus.APPLICABLE
assert cra[0].confidence.value == "high"
assert any("digitale Elemente" in f or "Fernzugriff" in f for f in cra[0].trigger_facts) or cra[0].trigger_facts
# ---------------------------------------------------------------------------
# 2. Katalogprodukt 2027 weiter verkauft -> CRA gilt; "nur neue Produkte" zu eng
# ---------------------------------------------------------------------------
def test_cra_applies_to_finished_catalog_product():
profile = sps_profile(placed_on_market_after=date(2027, 1, 1), lifecycle_phase="placing_on_market")
scope = discover_scope(profile)
assert "CRA" in _reg_ids(scope, "applicable_regulations")
def test_interpretation_only_new_products_is_too_narrow():
result = assess_interpretation("Wir glauben, der CRA gilt nur für neue Produkte.")
assert result.assessment == InterpretationVerdict.TOO_NARROW
assert "CRA" in result.affected_regulations
assert result.corrected_interpretation
assert result.legal_basis_refs
# ---------------------------------------------------------------------------
# 3. Reicht eine SBOM allein? -> nein, nur teilweise
# ---------------------------------------------------------------------------
def test_sbom_alone_is_not_enough():
resp = reason_implementation_claim(sps_profile(), "Wir haben SBOMs.")
sbom = [m for m in resp.mappings if m.obligation_id == "sbom_creation"]
assert sbom and sbom[0].claim_coverage == ClaimCoverage.POTENTIALLY_ADDRESSES
# but other obligations are surfaced as gaps -> claim does not address everything
assert any(m.claim_coverage != ClaimCoverage.POTENTIALLY_ADDRESSES for m in resp.mappings)
assert "Nachweise" in resp.summary
# ---------------------------------------------------------------------------
# 4. Ist ein reaktiver Updateprozess ausreichend? -> nur teilweise
# ---------------------------------------------------------------------------
def test_reactive_update_process_is_partial():
resp = reason_implementation_claim(
sps_profile(), "Wir machen Updates, wenn Kunden Fehler melden."
)
upd = [m for m in resp.mappings if m.obligation_id == "provide_security_updates"]
assert upd and upd[0].claim_coverage == ClaimCoverage.PARTIALLY_ADDRESSES
assert "reactive" in resp.claim.qualifiers
assert any("Schwachstellenüberwachung" in e for e in upd[0].missing_elements)
# ---------------------------------------------------------------------------
# 5. Wann überschneiden sich CRA und MaschinenVO?
# ---------------------------------------------------------------------------
def test_cra_and_machinery_overlap_on_cyber_safety():
profile = sps_profile(is_machine=True, has_safety_function=True)
resp = derive_obligations(profile)
ids = [o.obligation_id for o in resp.applicable_obligations]
assert "machine_protection_against_corruption" in ids
assert "vuln_handling_process" in ids
vuln_overlap = [o for o in resp.overlaps if o.overlap_group_id == "VULNERABILITY_HANDLING"]
assert vuln_overlap
assert "machine_protection_against_corruption" in vuln_overlap[0].obligations
# ---------------------------------------------------------------------------
# 6. Wann ist Data Act zusätzlich relevant?
# ---------------------------------------------------------------------------
def test_data_act_relevant_when_product_generates_data():
scope = discover_scope(sps_profile(generates_usage_data=True))
assert "DataAct" in _reg_ids(scope, "applicable_regulations")
obs = derive_obligations(sps_profile(generates_usage_data=True))
assert any(o.source_regulation == "DataAct" for o in obs.applicable_obligations)
def test_data_act_uncertain_when_data_unknown():
scope = discover_scope(sps_profile()) # generates_usage_data=None
assert "DataAct" in _reg_ids(scope, "uncertain_regulations")
# ---------------------------------------------------------------------------
# 7. Welche Pflichten gelten nicht ohne Funkmodul?
# ---------------------------------------------------------------------------
def test_no_radio_module_excludes_red():
scope = discover_scope(sps_profile(has_radio_module=False))
assert "RED" in _reg_ids(scope, "excluded_regulations")
assert "RED" not in _reg_ids(scope, "applicable_regulations")
def test_radio_unknown_makes_red_uncertain():
scope = discover_scope(sps_profile()) # has_radio_module=None
assert "RED" in _reg_ids(scope, "uncertain_regulations")
# ---------------------------------------------------------------------------
# 8. Welche Fakten fehlen für eine NIS2-Bewertung?
# ---------------------------------------------------------------------------
def test_nis2_missing_facts():
scope = discover_scope(sps_profile())
nis2 = [r for r in scope.uncertain_regulations if r.regulation_id == "NIS2"]
assert nis2
joined = " ".join(nis2[0].missing_facts).lower()
assert "unternehmensgröße" in joined and "sektor" in joined
# ---------------------------------------------------------------------------
# 9. Welche Nachweise decken mehrere Pflichten gleichzeitig? (USP)
# ---------------------------------------------------------------------------
def test_evidence_covers_multiple_obligations():
resp = derive_obligations(sps_profile())
multi = resp.evidence_for_multiple
assert multi # at least one evidence type spans >1 obligation
assert all(len(ids) > 1 for ids in multi.values())
assert "policy" in multi # the CRA process docs share a policy evidence
# ---------------------------------------------------------------------------
# 10. Auslegungen: zu eng / zu weit / plausibel / unbekannt
# ---------------------------------------------------------------------------
def test_interpretation_unknown_returns_uncertain():
result = assess_interpretation("Der Mond beeinflusst unsere Updatezyklen.")
assert result.assessment == InterpretationVerdict.UNCERTAIN
assert result.corrected_interpretation
def test_interpretation_open_source_partially_correct():
result = assess_interpretation("Open Source ist ausgenommen, also betrifft uns der CRA nicht.")
assert result.assessment == InterpretationVerdict.PARTIALLY_CORRECT
# ---------------------------------------------------------------------------
# Registry-alignment + contract guards
# ---------------------------------------------------------------------------
def test_cra_obligations_reuse_registry_ids_not_minted():
resp = derive_obligations(sps_profile())
anchored = [o for o in resp.applicable_obligations if o.registry_anchor]
assert "sbom_creation" in [o.obligation_id for o in anchored]
assert "provide_security_updates" in [o.obligation_id for o in anchored]
# machine obligations are proposed, never claimed as registry-owned
machine = [o for o in resp.applicable_obligations if o.source_regulation == "MaschinenVO"]
assert all(o.proposed and not o.registry_anchor for o in machine)
def test_required_evidence_only_uses_shared_catalog():
from compliance.reasoning.rules_types import EVIDENCE_CATALOG
from compliance.reasoning.rules_obligations import ALL_OBLIGATIONS
for rule in ALL_OBLIGATIONS:
assert set(rule.required_evidence) <= EVIDENCE_CATALOG
def test_claim_normalizer_is_deterministic():
a = normalize_claim("Wir haben einen Update-Prozess.")
b = normalize_claim("Wir haben einen Update-Prozess.")
assert a.claim_id == b.claim_id
assert "secure_updates" in a.claimed_capability
def test_unspecific_claim_asks_for_detail():
resp = reason_implementation_claim(sps_profile(), "Wir sind sicher aufgestellt.")
assert resp.mappings == [] or all(
m.claim_coverage == ClaimCoverage.INSUFFICIENT_INFORMATION for m in resp.mappings
)
assert "unspezifisch" in resp.summary.lower()
def test_claim_reasoning_carries_no_compliance_verdict():
"""Welt-1 boundary: claim mapping must never read as a conformity verdict."""
resp = reason_implementation_claim(
sps_profile(), "Wir haben SBOMs und einen Update-Prozess."
)
# claim-relative vocabulary only
for m in resp.mappings:
assert m.claim_coverage in set(ClaimCoverage)
# no compliance wording leaks into summary or explanations
assert "erfüllt" not in resp.summary
assert all("erfüllt" not in m.explanation for m in resp.mappings)
# explicit disclaimer separating ClaimCoverage (Welt 1) from ComplianceStatus (Welt 2)
assert resp.disclaimer
assert "ComplianceStatus" in resp.disclaimer and "Nachweis" in resp.disclaimer
# ---------------------------------------------------------------------------
# Endpoint smoke tests
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def client():
from compliance.api.reasoning_routes import router
app = FastAPI()
app.include_router(router)
return TestClient(app)
def test_endpoint_scope(client):
r = client.post("/reasoning/scope", json={"product_profile": {"product_name": "X", "has_software": True, "eu_market": True, "manufacturer_role": "manufacturer"}})
assert r.status_code == 200
body = r.json()
assert "CRA" in [x["regulation_id"] for x in body["regulatory_scope"]["applicable_regulations"]]
def test_endpoint_obligations(client):
r = client.post(
"/reasoning/obligations",
json={"product_profile": {"product_name": "X", "has_software": True, "has_remote_access": True, "eu_market": True, "manufacturer_role": "manufacturer"}},
)
assert r.status_code == 200
assert r.json()["applicable_obligations"]
def test_endpoint_implementation(client):
r = client.post(
"/reasoning/implementation-reasoning",
json={"product_profile": {"product_name": "X", "has_software": True, "eu_market": True, "manufacturer_role": "manufacturer"}, "customer_claim": "Wir haben SBOMs."},
)
assert r.status_code == 200
body = r.json()
assert body["mappings"]
assert body["disclaimer"]
def test_endpoint_interpretation(client):
r = client.post(
"/reasoning/interpretation-assessment",
json={"customer_interpretation": "CRA gilt nur für neue Produkte."},
)
assert r.status_code == 200
assert r.json()["assessment"] == "too_narrow"