refactor(reasoning): enforce ClaimCoverage (Welt 1) vs ComplianceStatus (Welt 2) boundary [F1]

Architecture-validation finding: the implementation mode produced compliance-
flavored output ("teilweise erfüllt", "covered") from a mere customer claim,
blurring the line to the Execution layer. This is a design decision, not a text
fix — the reasoning layer judges only the customer's STATEMENT, never conformity.

- CoverageStatus -> ClaimCoverage; values are claim-relative + carry "potential":
  potentially_addresses / partially_addresses / does_not_address /
  insufficient_information.
- ImplementationAssessment -> ClaimObligationMapping (coverage_status ->
  claim_coverage); ImplementationResponse -> ImplementationReasoningResponse
  (assessments -> mappings, + explicit `disclaimer`); request renamed; engine
  entry assess_implementation -> reason_implementation_claim.
- Endpoint /reasoning/implementation-assessment -> /reasoning/implementation-reasoning.
- Summary/explanations reworded: "adressiert wahrscheinlich N Pflichten … für
  eine Bewertung der tatsächlichen Umsetzung sind Nachweise erforderlich (keine
  Konformitätsaussage)". No "erfüllt"/"abgedeckt" leaks.
- New guard test asserts no compliance verdict leaks (no "erfüllt"; disclaimer
  separates ClaimCoverage from ComplianceStatus). 23 tests green, mypy clean.

Discovery (scope/obligations) was already structurally claim-free and unaffected.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-26 00:37:57 +02:00
parent 1607c89459
commit 5e5002c883
6 changed files with 135 additions and 84 deletions
@@ -5,7 +5,7 @@ pure deterministic rule evaluation.
POST /reasoning/scope -> which regulations apply + missing facts POST /reasoning/scope -> which regulations apply + missing facts
POST /reasoning/obligations -> obligations, overlaps, multi-evidence POST /reasoning/obligations -> obligations, overlaps, multi-evidence
POST /reasoning/implementation-assessment -> claim coverage per obligation POST /reasoning/implementation-reasoning -> claim->obligation mapping (Welt 1, no verdict)
POST /reasoning/interpretation-assessment -> verdict on a customer interpretation POST /reasoning/interpretation-assessment -> verdict on a customer interpretation
""" """
@@ -14,14 +14,14 @@ from __future__ import annotations
from fastapi import APIRouter from fastapi import APIRouter
from compliance.reasoning import ( from compliance.reasoning import (
assess_implementation,
assess_interpretation, assess_interpretation,
derive_obligations, derive_obligations,
discover_scope, discover_scope,
reason_implementation_claim,
) )
from compliance.reasoning.schemas import ( from compliance.reasoning.schemas import (
ImplementationRequest, ImplementationReasoningRequest,
ImplementationResponse, ImplementationReasoningResponse,
InterpretationRequest, InterpretationRequest,
InterpretationResponse, InterpretationResponse,
ObligationsRequest, ObligationsRequest,
@@ -48,9 +48,9 @@ def applicable_obligations(req: ObligationsRequest) -> ObligationsResponse:
return derive_obligations(req.product_profile, req.regulatory_scope) return derive_obligations(req.product_profile, req.regulatory_scope)
@router.post("/implementation-assessment", response_model=ImplementationResponse) @router.post("/implementation-reasoning", response_model=ImplementationReasoningResponse)
def implementation_assessment(req: ImplementationRequest) -> ImplementationResponse: def implementation_reasoning(req: ImplementationReasoningRequest) -> ImplementationReasoningResponse:
return assess_implementation(req.product_profile, req.customer_claim) return reason_implementation_claim(req.product_profile, req.customer_claim)
@router.post("/interpretation-assessment", response_model=InterpretationResponse) @router.post("/interpretation-assessment", response_model=InterpretationResponse)
@@ -13,7 +13,7 @@ only (spec §14).
from __future__ import annotations from __future__ import annotations
from .claim_normalizer import normalize_claim from .claim_normalizer import normalize_claim
from .implementation_engine import assess_implementation from .implementation_engine import reason_implementation_claim
from .interpretation_engine import assess_interpretation from .interpretation_engine import assess_interpretation
from .obligation_engine import derive_obligations from .obligation_engine import derive_obligations
from .scope_engine import discover_scope from .scope_engine import discover_scope
@@ -22,6 +22,6 @@ __all__ = [
"discover_scope", "discover_scope",
"derive_obligations", "derive_obligations",
"normalize_claim", "normalize_claim",
"assess_implementation", "reason_implementation_claim",
"assess_interpretation", "assess_interpretation",
] ]
@@ -68,12 +68,19 @@ class OverlapType(str, Enum):
DIFFERENT_SCOPE = "different_scope" DIFFERENT_SCOPE = "different_scope"
class CoverageStatus(str, Enum): class ClaimCoverage(str, Enum):
COVERED = "covered" """How a customer's *claim* relates to an obligation — Welt 1 (reasoning).
PARTIALLY_COVERED = "partially_covered"
NOT_COVERED = "not_covered" This is NOT a conformity verdict. It judges only the customer's statement,
UNCLEAR = "unclear" never whether the obligation is actually met. The real compliance verdict
OUT_OF_SCOPE = "out_of_scope" (erfüllt/offen/unklar from verified evidence) is `ComplianceStatus`, owned by
the Compliance Execution Graph — the two must never be conflated.
"""
POTENTIALLY_ADDRESSES = "potentially_addresses"
PARTIALLY_ADDRESSES = "partially_addresses"
DOES_NOT_ADDRESS = "does_not_address"
INSUFFICIENT_INFORMATION = "insufficient_information"
class InterpretationVerdict(str, Enum): class InterpretationVerdict(str, Enum):
@@ -1,9 +1,15 @@
"""Implementation reasoning engine (spec Modus 3). """Implementation reasoning (spec Modus 3) — Welt 1 only.
Given a free-text claim ("Wir haben SBOMs und machen Updates, wenn Kunden Fehler Maps a free-text claim ("Wir haben SBOMs und machen Updates, wenn Kunden Fehler
melden.") it maps the claimed capabilities onto the product's applicable melden.") onto the product's applicable obligations and reports, per obligation,
obligations and reports, per obligation, whether it is covered, partially whether the *claim* potentially/partially/does-not address it — plus the
covered or not covered — plus the evidence that would close the gap. evidence that WOULD be needed to prove real implementation.
This is NOT a conformity verdict. It judges the customer's statement, never
whether the obligation is met. The real verdict (ComplianceStatus: erfüllt/
offen/unklar from verified evidence) lives in the Compliance Execution Graph.
The four reasoning layers: claim -> interpretation (capabilities/topics on the
claim) -> potential obligation coverage (`claim_coverage`) -> evidence required.
""" """
from __future__ import annotations from __future__ import annotations
@@ -11,16 +17,22 @@ from __future__ import annotations
from typing import Dict, List from typing import Dict, List
from .claim_normalizer import normalize_claim from .claim_normalizer import normalize_claim
from .enums import Confidence, CoverageStatus from .enums import ClaimCoverage, Confidence
from .obligation_engine import derive_obligations from .obligation_engine import derive_obligations
from .schemas import ( from .schemas import (
ClaimObligationMapping,
CustomerImplementationClaim, CustomerImplementationClaim,
ImplementationAssessment, ImplementationReasoningResponse,
ImplementationResponse,
ProductProfile, ProductProfile,
) )
from .taxonomy_claims import topics_for from .taxonomy_claims import topics_for
DISCLAIMER = (
"Diese Auswertung interpretiert ausschließlich die Kundenaussage (ClaimCoverage, Welt 1). "
"Sie ist KEINE Konformitätsaussage — der tatsächliche Compliance-Status (ComplianceStatus, "
"Welt 2) ergibt sich erst aus geprüften Nachweisen im Compliance Execution Graph."
)
# Typical sub-elements a capability still misses when only partially claimed. # Typical sub-elements a capability still misses when only partially claimed.
STANDARD_GAPS: Dict[str, List[str]] = { STANDARD_GAPS: Dict[str, List[str]] = {
"software_bill_of_materials": [ "software_bill_of_materials": [
@@ -57,27 +69,31 @@ def _missing_for(capabilities: List[str]) -> List[str]:
return out return out
def _coverage(required: List[str], claimed: List[str], qualifiers: List[str]) -> CoverageStatus: def _coverage(required: List[str], claimed: List[str], qualifiers: List[str]) -> ClaimCoverage:
if not required:
return ClaimCoverage.INSUFFICIENT_INFORMATION
req, have = set(required), set(claimed) req, have = set(required), set(claimed)
hit = req & have hit = req & have
if not hit: if not hit:
return CoverageStatus.NOT_COVERED return ClaimCoverage.DOES_NOT_ADDRESS
if "absent" in qualifiers or "planned" in qualifiers: if "absent" in qualifiers or "planned" in qualifiers:
return CoverageStatus.NOT_COVERED return ClaimCoverage.DOES_NOT_ADDRESS
if "reactive" in qualifiers and hit & {"secure_updates", "vulnerability_management"}: if "reactive" in qualifiers and hit & {"secure_updates", "vulnerability_management"}:
return CoverageStatus.PARTIALLY_COVERED return ClaimCoverage.PARTIALLY_ADDRESSES
if req <= have: if req <= have:
return CoverageStatus.COVERED return ClaimCoverage.POTENTIALLY_ADDRESSES
return CoverageStatus.PARTIALLY_COVERED return ClaimCoverage.PARTIALLY_ADDRESSES
def assess_implementation(profile: ProductProfile, customer_claim: str) -> ImplementationResponse: def reason_implementation_claim(
profile: ProductProfile, customer_claim: str
) -> ImplementationReasoningResponse:
claim = normalize_claim(customer_claim) claim = normalize_claim(customer_claim)
obligations = derive_obligations(profile).applicable_obligations obligations = derive_obligations(profile).applicable_obligations
claimed = claim.claimed_capability claimed = claim.claimed_capability
claim_topics = set(claim.related_topics) | set(claimed) claim_topics = set(claim.related_topics) | set(claimed)
assessments: List[ImplementationAssessment] = [] mappings: List[ClaimObligationMapping] = []
missing_evidence: List[str] = [] missing_evidence: List[str] = []
for ob in obligations: for ob in obligations:
@@ -89,54 +105,54 @@ def assess_implementation(profile: ProductProfile, customer_claim: str) -> Imple
directly_claimed = bool(set(required_caps) & set(claimed)) directly_claimed = bool(set(required_caps) & set(claimed))
related = bool(ob_topics & claim_topics) related = bool(ob_topics & claim_topics)
if not directly_claimed and not related: if not directly_claimed and not related:
continue # unrelated to the claim -> don't assess continue # unrelated to the claim -> don't reason about it
status = _coverage(required_caps, claimed, claim.qualifiers) coverage = _coverage(required_caps, claimed, claim.qualifiers)
missing = [] if status == CoverageStatus.COVERED else _missing_for(required_caps) missing = [] if coverage == ClaimCoverage.POTENTIALLY_ADDRESSES else _missing_for(required_caps)
explanation = _explain(status, ob.title, claim.qualifiers) if coverage != ClaimCoverage.POTENTIALLY_ADDRESSES:
if status != CoverageStatus.COVERED:
for ev in ob.required_evidence: for ev in ob.required_evidence:
if ev not in missing_evidence: if ev not in missing_evidence:
missing_evidence.append(ev) missing_evidence.append(ev)
assessments.append( mappings.append(
ImplementationAssessment( ClaimObligationMapping(
claim_id=claim.claim_id, claim_id=claim.claim_id,
obligation_id=ob.obligation_id, obligation_id=ob.obligation_id,
coverage_status=status, claim_coverage=coverage,
missing_elements=missing, missing_elements=missing,
required_evidence=ob.required_evidence, required_evidence=ob.required_evidence,
explanation=explanation, explanation=_explain(coverage, ob.title, claim.qualifiers),
confidence=Confidence.MEDIUM, confidence=Confidence.MEDIUM,
) )
) )
return ImplementationResponse( return ImplementationReasoningResponse(
claim=claim, claim=claim,
assessments=assessments, mappings=mappings,
missing_evidence=missing_evidence, missing_evidence=missing_evidence,
summary=_summary(claim, assessments), summary=_summary(claim, mappings),
disclaimer=DISCLAIMER,
) )
def _explain(status: CoverageStatus, title: str, qualifiers: List[str]) -> str: def _explain(coverage: ClaimCoverage, title: str, qualifiers: List[str]) -> str:
if status == CoverageStatus.COVERED: if coverage == ClaimCoverage.POTENTIALLY_ADDRESSES:
return "Die Pflicht '%s' wird durch die beschriebene Umsetzung plausibel abgedeckt." % title return "Die Aussage adressiert die Pflicht '%s' wahrscheinlich vollständig — Nachweise erforderlich." % title
if status == CoverageStatus.PARTIALLY_COVERED: if coverage == ClaimCoverage.PARTIALLY_ADDRESSES:
extra = " Der Prozess wirkt reaktiv." if "reactive" in qualifiers else "" extra = " Der beschriebene Prozess wirkt reaktiv." if "reactive" in qualifiers else ""
return "Die Pflicht '%s' ist nur teilweise abgedeckt.%s" % (title, extra) return "Die Aussage adressiert die Pflicht '%s' nur teilweise.%s" % (title, extra)
return "Die Pflicht '%s' wird durch die Aussage nicht abgedeckt." % title if coverage == ClaimCoverage.DOES_NOT_ADDRESS:
return "Die Aussage adressiert die Pflicht '%s' nicht." % title
return "Zur Pflicht '%s' liegen zu wenige Angaben für eine Einordnung vor." % title
def _summary(claim: CustomerImplementationClaim, assessments: List[ImplementationAssessment]) -> str: def _summary(claim: CustomerImplementationClaim, mappings: List[ClaimObligationMapping]) -> str:
if not claim.claimed_capability: if not claim.claimed_capability:
return "Die Aussage ist zu unspezifisch — bitte konkretisieren, was umgesetzt wurde." return "Die Aussage ist zu unspezifisch — bitte konkretisieren, was umgesetzt wurde."
covered = sum(1 for a in assessments if a.coverage_status == CoverageStatus.COVERED) full = sum(1 for m in mappings if m.claim_coverage == ClaimCoverage.POTENTIALLY_ADDRESSES)
partial = sum(1 for a in assessments if a.coverage_status == CoverageStatus.PARTIALLY_COVERED) partial = sum(1 for m in mappings if m.claim_coverage == ClaimCoverage.PARTIALLY_ADDRESSES)
notc = sum(1 for a in assessments if a.coverage_status == CoverageStatus.NOT_COVERED) none = sum(1 for m in mappings if m.claim_coverage == ClaimCoverage.DOES_NOT_ADDRESS)
if notc or partial: return (
head = "Teilweise erfüllt" "Die beschriebene Maßnahme adressiert wahrscheinlich %d Pflicht(en) vollständig und %d "
elif covered: "teilweise; %d werden nicht berührt. Für eine Bewertung der tatsächlichen Umsetzung sind "
head = "Plausibel abgedeckt" "Nachweise erforderlich (keine Konformitätsaussage)." % (full, partial, none)
else: )
head = "Nicht beurteilbar"
return "%s: %d abgedeckt, %d teilweise, %d offen." % (head, covered, partial, notc)
@@ -15,8 +15,8 @@ from pydantic import BaseModel, Field
from .enums import ( from .enums import (
ApplicabilityStatus, ApplicabilityStatus,
AuthorityLevel, AuthorityLevel,
ClaimCoverage,
Confidence, Confidence,
CoverageStatus,
InterpretationVerdict, InterpretationVerdict,
ManufacturerRole, ManufacturerRole,
MarketModel, MarketModel,
@@ -140,10 +140,17 @@ class CustomerImplementationClaim(BaseModel):
evidence_refs: List[str] = Field(default_factory=list) evidence_refs: List[str] = Field(default_factory=list)
class ImplementationAssessment(BaseModel): class ClaimObligationMapping(BaseModel):
"""One row of Welt-1 reasoning: how a customer claim relates to an obligation.
Layers (spec / architect): claim -> interpretation (on the claim object) ->
*potential* obligation coverage (`claim_coverage`) -> evidence required.
Carries NO compliance verdict.
"""
claim_id: str claim_id: str
obligation_id: str obligation_id: str
coverage_status: CoverageStatus claim_coverage: ClaimCoverage
missing_elements: List[str] = Field(default_factory=list) missing_elements: List[str] = Field(default_factory=list)
required_evidence: List[str] = Field(default_factory=list) required_evidence: List[str] = Field(default_factory=list)
explanation: str explanation: str
@@ -188,16 +195,19 @@ class ObligationsResponse(BaseModel):
evidence_for_multiple: Dict[str, List[str]] = Field(default_factory=dict) evidence_for_multiple: Dict[str, List[str]] = Field(default_factory=dict)
class ImplementationRequest(BaseModel): class ImplementationReasoningRequest(BaseModel):
product_profile: ProductProfile product_profile: ProductProfile
customer_claim: str customer_claim: str
class ImplementationResponse(BaseModel): class ImplementationReasoningResponse(BaseModel):
claim: CustomerImplementationClaim claim: CustomerImplementationClaim
assessments: List[ImplementationAssessment] = Field(default_factory=list) mappings: List[ClaimObligationMapping] = Field(default_factory=list)
missing_evidence: List[str] = Field(default_factory=list) missing_evidence: List[str] = Field(default_factory=list)
summary: str = "" summary: str = ""
# Makes the Welt-1 boundary explicit: this is advisory claim-mapping, not a
# conformity verdict (that is ComplianceStatus in the Execution Graph).
disclaimer: str = ""
class InterpretationRequest(BaseModel): class InterpretationRequest(BaseModel):
@@ -14,15 +14,15 @@ from fastapi import FastAPI
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from compliance.reasoning import ( from compliance.reasoning import (
assess_implementation,
assess_interpretation, assess_interpretation,
derive_obligations, derive_obligations,
discover_scope, discover_scope,
normalize_claim, normalize_claim,
reason_implementation_claim,
) )
from compliance.reasoning.enums import ( from compliance.reasoning.enums import (
ApplicabilityStatus, ApplicabilityStatus,
CoverageStatus, ClaimCoverage,
InterpretationVerdict, InterpretationVerdict,
) )
from compliance.reasoning.schemas import ProductProfile from compliance.reasoning.schemas import ProductProfile
@@ -82,25 +82,25 @@ def test_interpretation_only_new_products_is_too_narrow():
# 3. Reicht eine SBOM allein? -> nein, nur teilweise # 3. Reicht eine SBOM allein? -> nein, nur teilweise
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def test_sbom_alone_is_not_enough(): def test_sbom_alone_is_not_enough():
resp = assess_implementation(sps_profile(), "Wir haben SBOMs.") resp = reason_implementation_claim(sps_profile(), "Wir haben SBOMs.")
sbom = [a for a in resp.assessments if a.obligation_id == "sbom_creation"] sbom = [m for m in resp.mappings if m.obligation_id == "sbom_creation"]
assert sbom and sbom[0].coverage_status == CoverageStatus.COVERED assert sbom and sbom[0].claim_coverage == ClaimCoverage.POTENTIALLY_ADDRESSES
# but other obligations are surfaced as gaps -> aggregate not fully covered # but other obligations are surfaced as gaps -> claim does not address everything
assert any(a.coverage_status != CoverageStatus.COVERED for a in resp.assessments) assert any(m.claim_coverage != ClaimCoverage.POTENTIALLY_ADDRESSES for m in resp.mappings)
assert "Teilweise erfüllt" in resp.summary or "offen" in resp.summary assert "Nachweise" in resp.summary
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# 4. Ist ein reaktiver Updateprozess ausreichend? -> nur teilweise # 4. Ist ein reaktiver Updateprozess ausreichend? -> nur teilweise
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def test_reactive_update_process_is_partial(): def test_reactive_update_process_is_partial():
resp = assess_implementation( resp = reason_implementation_claim(
sps_profile(), "Wir machen Updates, wenn Kunden Fehler melden." sps_profile(), "Wir machen Updates, wenn Kunden Fehler melden."
) )
upd = [a for a in resp.assessments if a.obligation_id == "provide_security_updates"] upd = [m for m in resp.mappings if m.obligation_id == "provide_security_updates"]
assert upd and upd[0].coverage_status == CoverageStatus.PARTIALLY_COVERED assert upd and upd[0].claim_coverage == ClaimCoverage.PARTIALLY_ADDRESSES
assert "reactive" in resp.claim.qualifiers assert "reactive" in resp.claim.qualifiers
assert any("Schwachstellenüberwachung" in m for m in upd[0].missing_elements) assert any("Schwachstellenüberwachung" in e for e in upd[0].missing_elements)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -211,13 +211,29 @@ def test_claim_normalizer_is_deterministic():
def test_unspecific_claim_asks_for_detail(): def test_unspecific_claim_asks_for_detail():
resp = assess_implementation(sps_profile(), "Wir sind sicher aufgestellt.") resp = reason_implementation_claim(sps_profile(), "Wir sind sicher aufgestellt.")
assert resp.assessments == [] or all( assert resp.mappings == [] or all(
a.coverage_status == CoverageStatus.UNCLEAR for a in resp.assessments m.claim_coverage == ClaimCoverage.INSUFFICIENT_INFORMATION for m in resp.mappings
) )
assert "unspezifisch" in resp.summary.lower() assert "unspezifisch" in resp.summary.lower()
def test_claim_reasoning_carries_no_compliance_verdict():
"""Welt-1 boundary: claim mapping must never read as a conformity verdict."""
resp = reason_implementation_claim(
sps_profile(), "Wir haben SBOMs und einen Update-Prozess."
)
# claim-relative vocabulary only
for m in resp.mappings:
assert m.claim_coverage in set(ClaimCoverage)
# no compliance wording leaks into summary or explanations
assert "erfüllt" not in resp.summary
assert all("erfüllt" not in m.explanation for m in resp.mappings)
# explicit disclaimer separating ClaimCoverage (Welt 1) from ComplianceStatus (Welt 2)
assert resp.disclaimer
assert "ComplianceStatus" in resp.disclaimer and "Nachweis" in resp.disclaimer
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Endpoint smoke tests # Endpoint smoke tests
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -248,11 +264,13 @@ def test_endpoint_obligations(client):
def test_endpoint_implementation(client): def test_endpoint_implementation(client):
r = client.post( r = client.post(
"/reasoning/implementation-assessment", "/reasoning/implementation-reasoning",
json={"product_profile": {"product_name": "X", "has_software": True, "eu_market": True, "manufacturer_role": "manufacturer"}, "customer_claim": "Wir haben SBOMs."}, json={"product_profile": {"product_name": "X", "has_software": True, "eu_market": True, "manufacturer_role": "manufacturer"}, "customer_claim": "Wir haben SBOMs."},
) )
assert r.status_code == 200 assert r.status_code == 200
assert r.json()["assessments"] body = r.json()
assert body["mappings"]
assert body["disclaimer"]
def test_endpoint_interpretation(client): def test_endpoint_interpretation(client):