feat(product-scope): gate Navigator facts, then reuse discover_scope (step 3)
Connects the Navigator's fact-gate to the existing reasoning discover_scope — the Scope Engine decides only once the minimum (P0) facts are released. - resolve_product_scope(canonical): if not ready_for_scope -> NEEDS_FACTS (missing_facts + suggested_questions, discover_scope NOT run); else project canonical->reasoning profile and run the EXISTING discover_scope exactly once -> RESOLVED with applicable/excluded/uncertain regulations. - Environmental triggers surface ONLY as unsupported_domains (future_corpus_needed), never as a legal evaluation — transparency, no false completeness. - POST /reasoning/product-scope (thin handler) returns case NEEDS_FACTS or RESOLVED. - No new scope rules, no new regulations, no environmental-law evaluation, no UI, no Go, no RAG, no percent-compliance. Response types are application-level, not meta-model classes (freeze v1.0 untouched). - 6 tests incl. discover_scope spy (0 calls when gated, exactly 1 when ready), category separation, environmental-as-unsupported-only. 47 tests green (existing reasoning MVP tests stay green), mypy clean, LOC ok. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -7,12 +7,18 @@ pure deterministic rule evaluation.
|
|||||||
POST /reasoning/obligations -> obligations, overlaps, multi-evidence
|
POST /reasoning/obligations -> obligations, overlaps, multi-evidence
|
||||||
POST /reasoning/implementation-reasoning -> claim->obligation mapping (Welt 1, no verdict)
|
POST /reasoning/implementation-reasoning -> claim->obligation mapping (Welt 1, no verdict)
|
||||||
POST /reasoning/interpretation-assessment -> verdict on a customer interpretation
|
POST /reasoning/interpretation-assessment -> verdict on a customer interpretation
|
||||||
|
POST /reasoning/product-scope -> gate on facts, else run discover_scope once
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from fastapi import APIRouter
|
from fastapi import APIRouter
|
||||||
|
|
||||||
|
from compliance.product_scope import (
|
||||||
|
ProductScopeRequest,
|
||||||
|
ProductScopeResponse,
|
||||||
|
resolve_product_scope,
|
||||||
|
)
|
||||||
from compliance.reasoning import (
|
from compliance.reasoning import (
|
||||||
assess_interpretation,
|
assess_interpretation,
|
||||||
derive_obligations,
|
derive_obligations,
|
||||||
@@ -53,6 +59,11 @@ def implementation_reasoning(req: ImplementationReasoningRequest) -> Implementat
|
|||||||
return reason_implementation_claim(req.product_profile, req.customer_claim)
|
return reason_implementation_claim(req.product_profile, req.customer_claim)
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/product-scope", response_model=ProductScopeResponse)
|
||||||
|
def product_scope(req: ProductScopeRequest) -> ProductScopeResponse:
|
||||||
|
return resolve_product_scope(req.product_profile)
|
||||||
|
|
||||||
|
|
||||||
@router.post("/interpretation-assessment", response_model=InterpretationResponse)
|
@router.post("/interpretation-assessment", response_model=InterpretationResponse)
|
||||||
def interpretation_assessment(req: InterpretationRequest) -> InterpretationResponse:
|
def interpretation_assessment(req: InterpretationRequest) -> InterpretationResponse:
|
||||||
result = assess_interpretation(req.customer_interpretation, req.product_profile)
|
result = assess_interpretation(req.customer_interpretation, req.product_profile)
|
||||||
|
|||||||
@@ -0,0 +1,26 @@
|
|||||||
|
"""Product-scope orchestration (step 3).
|
||||||
|
|
||||||
|
Connects the Navigator's fact-gate to the existing reasoning `discover_scope`:
|
||||||
|
decide regulatory scope only once the minimum (P0) facts are present, otherwise
|
||||||
|
return the missing facts. Reuses discover_scope unchanged — no new scope logic.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from .orchestrator import resolve_product_scope
|
||||||
|
from .schemas import (
|
||||||
|
ProductScopeRequest,
|
||||||
|
ProductScopeResponse,
|
||||||
|
RegulatoryScopeResult,
|
||||||
|
ScopeStatus,
|
||||||
|
UnsupportedDomain,
|
||||||
|
)
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"resolve_product_scope",
|
||||||
|
"ProductScopeRequest",
|
||||||
|
"ProductScopeResponse",
|
||||||
|
"RegulatoryScopeResult",
|
||||||
|
"UnsupportedDomain",
|
||||||
|
"ScopeStatus",
|
||||||
|
]
|
||||||
@@ -0,0 +1,77 @@
|
|||||||
|
"""Product-scope orchestrator (step 3) — gate, then reuse discover_scope.
|
||||||
|
|
||||||
|
THE rule: the Scope Engine decides only once the Navigator has released the
|
||||||
|
minimum facts. If P0 facts are missing, return the missing facts/questions and do
|
||||||
|
NOT run discover_scope. Otherwise project the canonical into the reasoning profile
|
||||||
|
and run the EXISTING `discover_scope` exactly once.
|
||||||
|
|
||||||
|
No new scope rules, no new regulations, no environmental-law evaluation (those
|
||||||
|
domains are surfaced only as unsupported_domains / future_corpus_needed).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import List, Tuple
|
||||||
|
|
||||||
|
from compliance.navigator.engine import navigate
|
||||||
|
from compliance.profile.canonical import CanonicalProductRegulatoryProfile
|
||||||
|
from compliance.profile.to_reasoning import to_reasoning_profile
|
||||||
|
from compliance.reasoning.scope_engine import discover_scope
|
||||||
|
|
||||||
|
from .schemas import (
|
||||||
|
ProductScopeResponse,
|
||||||
|
RegulatoryScopeResult,
|
||||||
|
ScopeStatus,
|
||||||
|
UnsupportedDomain,
|
||||||
|
)
|
||||||
|
|
||||||
|
# environmental trigger field -> (domain, note). Transparency only — not a verdict.
|
||||||
|
_ENV_DOMAINS: List[Tuple[str, str, str]] = [
|
||||||
|
("discharges_to_wastewater", "environment_water", "Abwasser-/Gewässerrecht (z. B. AbwV, WRRL) — noch nicht im Korpus."),
|
||||||
|
("has_cooling_or_spraying_water", "environment_water", "Wasserbezogene Anforderungen — noch nicht im Korpus."),
|
||||||
|
("emits_to_air", "environment_air", "Immissionsschutz-/Luftreinhalterecht (z. B. BImSchG, IED) — noch nicht im Korpus."),
|
||||||
|
("uses_solvents", "environment_air", "Lösemittel-/VOC-Recht (z. B. 31. BImSchV) — noch nicht im Korpus."),
|
||||||
|
("uses_cleaning_chemicals", "chemicals", "Chemikalienrecht (REACH/CLP/Detergenzien/Biozide) — noch nicht im Korpus."),
|
||||||
|
("supplies_chemicals", "chemicals", "Chemikalienrecht (REACH/CLP) — noch nicht im Korpus."),
|
||||||
|
("contains_restricted_substances", "chemicals", "Stoffbeschränkungen (REACH/RoHS) — noch nicht im Korpus."),
|
||||||
|
("creates_waste", "waste", "Abfall-/Entsorgungsrecht (u. a. WEEE) — noch nicht im Korpus."),
|
||||||
|
("consumes_energy_or_water", "energy_resources", "Energie-/Ökodesign-Recht — noch nicht im Korpus."),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def _unsupported_domains(profile: CanonicalProductRegulatoryProfile) -> List[UnsupportedDomain]:
|
||||||
|
env = profile.environmental
|
||||||
|
seen = set()
|
||||||
|
out: List[UnsupportedDomain] = []
|
||||||
|
for field, domain, note in _ENV_DOMAINS:
|
||||||
|
if getattr(env, field) is True and domain not in seen:
|
||||||
|
seen.add(domain)
|
||||||
|
out.append(UnsupportedDomain(domain=domain, trigger=field, note=note))
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_product_scope(profile: CanonicalProductRegulatoryProfile) -> ProductScopeResponse:
|
||||||
|
nav = navigate(profile)
|
||||||
|
|
||||||
|
if not nav.completeness_summary.ready_for_scope:
|
||||||
|
return ProductScopeResponse(
|
||||||
|
status=ScopeStatus.NEEDS_FACTS,
|
||||||
|
completeness_summary=nav.completeness_summary,
|
||||||
|
missing_facts=nav.missing_facts,
|
||||||
|
suggested_questions=nav.suggested_questions,
|
||||||
|
)
|
||||||
|
|
||||||
|
scope = discover_scope(to_reasoning_profile(profile)) # exactly once
|
||||||
|
result = RegulatoryScopeResult(
|
||||||
|
applicable_regulations=scope.applicable_regulations,
|
||||||
|
excluded_regulations=scope.excluded_regulations,
|
||||||
|
uncertain_regulations=scope.uncertain_regulations,
|
||||||
|
unsupported_domains=_unsupported_domains(profile),
|
||||||
|
reasoning_summary=scope.reasoning_summary,
|
||||||
|
confidence=scope.confidence,
|
||||||
|
)
|
||||||
|
return ProductScopeResponse(
|
||||||
|
status=ScopeStatus.RESOLVED,
|
||||||
|
completeness_summary=nav.completeness_summary,
|
||||||
|
regulatory_scope=result,
|
||||||
|
)
|
||||||
@@ -0,0 +1,63 @@
|
|||||||
|
"""Response schemas for the product-scope orchestrator (step 3).
|
||||||
|
|
||||||
|
These are application/API types — NOT compliance-meta-model classes (architecture
|
||||||
|
freeze v1.0 untouched). The scope verdict itself is produced by the existing
|
||||||
|
`discover_scope`; nothing here adds scope rules.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from enum import Enum
|
||||||
|
from typing import List, Optional
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
from compliance.navigator.engine import CompletenessSummary
|
||||||
|
from compliance.navigator.questions import NavigatorQuestion
|
||||||
|
from compliance.profile.canonical import CanonicalProductRegulatoryProfile
|
||||||
|
from compliance.reasoning.enums import Confidence
|
||||||
|
from compliance.reasoning.schemas import (
|
||||||
|
ApplicableRegulation,
|
||||||
|
ExcludedRegulation,
|
||||||
|
UncertainRegulation,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ScopeStatus(str, Enum):
|
||||||
|
NEEDS_FACTS = "needs_facts" # P0 facts missing -> ask, do not decide
|
||||||
|
RESOLVED = "resolved" # minimum facts present -> scope decided
|
||||||
|
|
||||||
|
|
||||||
|
class UnsupportedDomain(BaseModel):
|
||||||
|
"""A domain the product triggers but the corpus does not yet cover.
|
||||||
|
|
||||||
|
Surfaced for transparency (no false completeness) — NEVER a legal evaluation.
|
||||||
|
"""
|
||||||
|
|
||||||
|
domain: str
|
||||||
|
trigger: str
|
||||||
|
status: str = "future_corpus_needed"
|
||||||
|
note: str = ""
|
||||||
|
|
||||||
|
|
||||||
|
class RegulatoryScopeResult(BaseModel):
|
||||||
|
applicable_regulations: List[ApplicableRegulation] = Field(default_factory=list)
|
||||||
|
excluded_regulations: List[ExcludedRegulation] = Field(default_factory=list)
|
||||||
|
uncertain_regulations: List[UncertainRegulation] = Field(default_factory=list)
|
||||||
|
unsupported_domains: List[UnsupportedDomain] = Field(default_factory=list)
|
||||||
|
reasoning_summary: str = ""
|
||||||
|
confidence: Confidence = Confidence.MEDIUM
|
||||||
|
|
||||||
|
|
||||||
|
class ProductScopeRequest(BaseModel):
|
||||||
|
product_profile: CanonicalProductRegulatoryProfile
|
||||||
|
|
||||||
|
|
||||||
|
class ProductScopeResponse(BaseModel):
|
||||||
|
status: ScopeStatus
|
||||||
|
completeness_summary: CompletenessSummary
|
||||||
|
# case NEEDS_FACTS
|
||||||
|
missing_facts: List[str] = Field(default_factory=list)
|
||||||
|
suggested_questions: List[NavigatorQuestion] = Field(default_factory=list)
|
||||||
|
# case RESOLVED
|
||||||
|
regulatory_scope: Optional[RegulatoryScopeResult] = None
|
||||||
@@ -0,0 +1,149 @@
|
|||||||
|
"""Tests for the product-scope orchestrator (step 3).
|
||||||
|
|
||||||
|
Acceptance: missing P0 facts -> discover_scope NOT run; ready -> run exactly once;
|
||||||
|
response separates applicable/excluded/uncertain; environmental triggers appear
|
||||||
|
only as unsupported_domain (future_corpus_needed), never as a legal evaluation.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from fastapi import FastAPI
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
|
import compliance.product_scope.orchestrator as orch
|
||||||
|
from compliance.product_scope import ScopeStatus, resolve_product_scope
|
||||||
|
from compliance.profile.canonical import (
|
||||||
|
CanonicalLifecyclePhase,
|
||||||
|
CanonicalProductRegulatoryProfile,
|
||||||
|
CanonicalProductType,
|
||||||
|
EconomicOperatorRole,
|
||||||
|
EnvironmentalImpact,
|
||||||
|
)
|
||||||
|
|
||||||
|
_KNOWN_REGS = {"CRA", "MaschinenVO", "RED", "EMV", "DataAct", "NIS2"}
|
||||||
|
|
||||||
|
|
||||||
|
def ready_profile(**ov) -> CanonicalProductRegulatoryProfile:
|
||||||
|
base = dict(
|
||||||
|
name="Industriespülmaschine",
|
||||||
|
product_type=CanonicalProductType.MACHINERY,
|
||||||
|
markets=["EU", "DE"],
|
||||||
|
economic_operator_role=EconomicOperatorRole.MANUFACTURER,
|
||||||
|
lifecycle_phase=CanonicalLifecyclePhase.PLACING_ON_MARKET,
|
||||||
|
is_machine=True,
|
||||||
|
is_component=False,
|
||||||
|
has_software_updates=True,
|
||||||
|
has_embedded_software=True,
|
||||||
|
has_remote_access=True,
|
||||||
|
has_safety_function=True,
|
||||||
|
technologies=["cloud", "ota_updates"],
|
||||||
|
)
|
||||||
|
base.update(ov)
|
||||||
|
return CanonicalProductRegulatoryProfile(**base)
|
||||||
|
|
||||||
|
|
||||||
|
def _spy(monkeypatch):
|
||||||
|
calls = {"n": 0}
|
||||||
|
real = orch.discover_scope
|
||||||
|
|
||||||
|
def counting(profile):
|
||||||
|
calls["n"] += 1
|
||||||
|
return real(profile)
|
||||||
|
|
||||||
|
monkeypatch.setattr(orch, "discover_scope", counting)
|
||||||
|
return calls
|
||||||
|
|
||||||
|
|
||||||
|
# 1. missing P0 facts -> discover_scope is NOT executed.
|
||||||
|
def test_needs_facts_does_not_run_scope(monkeypatch):
|
||||||
|
calls = _spy(monkeypatch)
|
||||||
|
resp = resolve_product_scope(CanonicalProductRegulatoryProfile(name="X"))
|
||||||
|
assert resp.status == ScopeStatus.NEEDS_FACTS
|
||||||
|
assert resp.regulatory_scope is None
|
||||||
|
assert resp.missing_facts
|
||||||
|
assert calls["n"] == 0
|
||||||
|
|
||||||
|
|
||||||
|
# 2. ready_for_scope -> discover_scope runs exactly once.
|
||||||
|
def test_ready_runs_scope_once(monkeypatch):
|
||||||
|
calls = _spy(monkeypatch)
|
||||||
|
resp = resolve_product_scope(ready_profile())
|
||||||
|
assert resp.status == ScopeStatus.RESOLVED
|
||||||
|
assert resp.regulatory_scope is not None
|
||||||
|
assert calls["n"] == 1
|
||||||
|
applicable = {r.regulation_id for r in resp.regulatory_scope.applicable_regulations}
|
||||||
|
assert "CRA" in applicable and "MaschinenVO" in applicable
|
||||||
|
|
||||||
|
|
||||||
|
# 3. the response separates the regulation categories.
|
||||||
|
def test_response_separates_categories():
|
||||||
|
scope = resolve_product_scope(ready_profile()).regulatory_scope
|
||||||
|
assert scope is not None
|
||||||
|
# all three buckets exist and only carry known regulation ids
|
||||||
|
for bucket in (scope.applicable_regulations, scope.excluded_regulations, scope.uncertain_regulations):
|
||||||
|
for r in bucket:
|
||||||
|
assert r.regulation_id in _KNOWN_REGS
|
||||||
|
assert scope.uncertain_regulations # e.g. RED/DataAct/NIS2 with unknown facts
|
||||||
|
|
||||||
|
|
||||||
|
# 4. environmental triggers surface ONLY as unsupported_domain, never as law.
|
||||||
|
def test_environmental_only_unsupported_domain():
|
||||||
|
profile = ready_profile(
|
||||||
|
environmental=EnvironmentalImpact(discharges_to_wastewater=True, uses_cleaning_chemicals=True)
|
||||||
|
)
|
||||||
|
scope = resolve_product_scope(profile).regulatory_scope
|
||||||
|
assert scope is not None
|
||||||
|
domains = {d.domain for d in scope.unsupported_domains}
|
||||||
|
assert "environment_water" in domains and "chemicals" in domains
|
||||||
|
assert all(d.status == "future_corpus_needed" for d in scope.unsupported_domains)
|
||||||
|
# no environmental "regulation" leaked into the scope verdict
|
||||||
|
all_regs = (
|
||||||
|
scope.applicable_regulations + scope.excluded_regulations + scope.uncertain_regulations
|
||||||
|
)
|
||||||
|
assert all(r.regulation_id in _KNOWN_REGS for r in all_regs)
|
||||||
|
|
||||||
|
|
||||||
|
# 5. endpoint smoke — both cases.
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def client():
|
||||||
|
from compliance.api.reasoning_routes import router
|
||||||
|
|
||||||
|
app = FastAPI()
|
||||||
|
app.include_router(router)
|
||||||
|
return TestClient(app)
|
||||||
|
|
||||||
|
|
||||||
|
def test_endpoint_needs_facts(client):
|
||||||
|
r = client.post("/reasoning/product-scope", json={"product_profile": {"name": "X"}})
|
||||||
|
assert r.status_code == 200
|
||||||
|
body = r.json()
|
||||||
|
assert body["status"] == "needs_facts"
|
||||||
|
assert body["regulatory_scope"] is None
|
||||||
|
assert body["missing_facts"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_endpoint_resolved(client):
|
||||||
|
r = client.post(
|
||||||
|
"/reasoning/product-scope",
|
||||||
|
json={
|
||||||
|
"product_profile": {
|
||||||
|
"name": "M",
|
||||||
|
"product_type": "machinery",
|
||||||
|
"markets": ["EU"],
|
||||||
|
"economic_operator_role": "manufacturer",
|
||||||
|
"lifecycle_phase": "placing_on_market",
|
||||||
|
"is_machine": True,
|
||||||
|
"is_component": False,
|
||||||
|
"has_software_updates": True,
|
||||||
|
"has_embedded_software": True,
|
||||||
|
"has_remote_access": True,
|
||||||
|
"technologies": ["cloud"],
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert r.status_code == 200
|
||||||
|
body = r.json()
|
||||||
|
assert body["status"] == "resolved"
|
||||||
|
applicable = {x["regulation_id"] for x in body["regulatory_scope"]["applicable_regulations"]}
|
||||||
|
assert "CRA" in applicable and "MaschinenVO" in applicable
|
||||||
Reference in New Issue
Block a user