Merge pull request 'feat(reasoning): product-first regulatory pipeline — Profile → Navigator → Scope → Map → Interpretation' (#1) from feat/regulatory-reasoning-engine into main

This commit is contained in:
pilotadmin
2026-06-26 11:47:18 +02:00
43 changed files with 4445 additions and 0 deletions
@@ -77,6 +77,7 @@ _ROUTER_MODULES = [
"licenses_routes", "licenses_routes",
"template_rule_routes", "template_rule_routes",
"specialist_agent_routes", "specialist_agent_routes",
"reasoning_routes",
] ]
_loaded_count = 0 _loaded_count = 0
@@ -0,0 +1,98 @@
"""HTTP endpoints for the Regulatory Reasoning Engine (spec §7).
Thin handlers — all reasoning lives in `compliance.reasoning.*`. No DB, no RAG;
pure deterministic rule evaluation.
POST /reasoning/scope -> which regulations apply + missing facts
POST /reasoning/obligations -> obligations, overlaps, multi-evidence
POST /reasoning/implementation-reasoning -> claim->obligation mapping (Welt 1, no verdict)
POST /reasoning/interpretation-assessment -> verdict on a customer interpretation
POST /reasoning/product-scope -> gate on facts, else run discover_scope once
POST /reasoning/regulatory-map -> customer-readable read-model over the scope
POST /reasoning/interpretation-in-map -> judge a customer interpretation within the map
"""
from __future__ import annotations
from fastapi import APIRouter
from compliance.interpretation_map import (
InterpretationInMapRequest,
InterpretationInMapResult,
interpret_in_map,
)
from compliance.product_scope import (
ProductScopeRequest,
ProductScopeResponse,
resolve_product_scope,
)
from compliance.regulatory_map import RegulatoryMap, RegulatoryMapRequest, render_regulatory_map
from compliance.reasoning import (
assess_interpretation,
derive_obligations,
discover_scope,
reason_implementation_claim,
)
from compliance.reasoning.schemas import (
ImplementationReasoningRequest,
ImplementationReasoningResponse,
InterpretationRequest,
InterpretationResponse,
ObligationsRequest,
ObligationsResponse,
ScopeRequest,
ScopeResponse,
)
router = APIRouter(prefix="/reasoning", tags=["reasoning"])
@router.post("/scope", response_model=ScopeResponse)
def scope_discovery(req: ScopeRequest) -> ScopeResponse:
scope = discover_scope(req.product_profile)
return ScopeResponse(
regulatory_scope=scope,
missing_facts=scope.missing_facts,
confidence=scope.confidence,
)
@router.post("/obligations", response_model=ObligationsResponse)
def applicable_obligations(req: ObligationsRequest) -> ObligationsResponse:
return derive_obligations(req.product_profile, req.regulatory_scope)
@router.post("/implementation-reasoning", response_model=ImplementationReasoningResponse)
def implementation_reasoning(req: ImplementationReasoningRequest) -> ImplementationReasoningResponse:
return reason_implementation_claim(req.product_profile, req.customer_claim)
@router.post("/product-scope", response_model=ProductScopeResponse)
def product_scope(req: ProductScopeRequest) -> ProductScopeResponse:
return resolve_product_scope(req.product_profile)
@router.post("/regulatory-map", response_model=RegulatoryMap)
def regulatory_map(req: RegulatoryMapRequest) -> RegulatoryMap:
return render_regulatory_map(req.product_profile)
@router.post("/interpretation-in-map", response_model=InterpretationInMapResult)
def interpretation_in_map(req: InterpretationInMapRequest) -> InterpretationInMapResult:
reg_map = render_regulatory_map(req.product_profile)
return interpret_in_map(reg_map, req.customer_interpretation)
@router.post("/interpretation-assessment", response_model=InterpretationResponse)
def interpretation_assessment(req: InterpretationRequest) -> InterpretationResponse:
result = assess_interpretation(req.customer_interpretation, req.product_profile)
return InterpretationResponse(
assessment=result.assessment,
affected_regulations=result.affected_regulations,
affected_obligations=result.affected_obligations,
corrected_interpretation=result.corrected_interpretation,
risks=result.risks,
legal_basis_refs=result.legal_basis_refs,
explanation=result.explanation,
confidence=result.confidence,
)
@@ -0,0 +1,18 @@
"""Interpretation-in-Map — evaluate a customer interpretation within the map.
Thin adapter over the existing `assess_interpretation`: it judges the customer's
reading against the regulations/obligations actually present in the product's
RegulatoryMap, and flags touched unsupported domains as future_corpus_needed
instead of pseudo-evaluating them. No new legal reasoning, no RCI, no UI.
"""
from __future__ import annotations
from .adapter import interpret_in_map
from .schemas import InterpretationInMapRequest, InterpretationInMapResult
__all__ = [
"interpret_in_map",
"InterpretationInMapRequest",
"InterpretationInMapResult",
]
@@ -0,0 +1,90 @@
"""Interpretation-in-Map adapter (step 5).
Evaluates a customer interpretation WITHIN the already-built RegulatoryMap. It
reuses the existing `assess_interpretation` (no new legal engine), restricts the
affected regulations/obligations to those present in the map, and reports any
touched unsupported domain (wastewater/chemicals/...) as future_corpus_needed
rather than pseudo-evaluating it.
"""
from __future__ import annotations
from typing import Dict, List
from compliance.reasoning.enums import InterpretationVerdict
from compliance.reasoning.interpretation_engine import assess_interpretation
from compliance.regulatory_map.schemas import RegulatoryMap
from .schemas import InterpretationInMapResult
_LABEL: Dict[InterpretationVerdict, str] = {
InterpretationVerdict.PLAUSIBLE: "plausibel",
InterpretationVerdict.TOO_NARROW: "zu eng",
InterpretationVerdict.TOO_BROAD: "zu weit",
InterpretationVerdict.PARTIALLY_CORRECT: "teilweise korrekt",
InterpretationVerdict.UNSUPPORTED: "nicht belegt",
InterpretationVerdict.UNCERTAIN: "unsicher",
}
# domain -> keywords that signal the interpretation is ABOUT that (uncovered) domain.
_ENV_KEYWORDS: Dict[str, List[str]] = {
"environment_water": ["abwasser", "wastewater", "gewässer", "gewaesser", "einleitung", "abfluss"],
"chemicals": ["chemikalie", "reach", "clp", "reinigungsmittel", "biozid", "gefahrstoff", "detergenz", "lösemittel", "loesemittel"],
"environment_air": ["luft", "emission", "voc", "immission", "abluft", "verbrennung"],
"waste": ["abfall", "entsorgung", "weee", "recycling"],
"energy_resources": ["energie", "ökodesign", "oekodesign", "verbrauch"],
}
def _touches(text: str, domain: str) -> bool:
low = text.lower()
return any(kw in low for kw in _ENV_KEYWORDS.get(domain, []))
def _explain(label: str, detail: str, affected_regs: List[str], future_domains: List[str], in_scope: bool) -> str:
base = "Ihre Interpretation ist wahrscheinlich %s." % label
if detail:
base += " " + detail
if affected_regs:
base += " Betroffen in Ihrer Map: %s." % ", ".join(affected_regs)
if future_domains:
base += (
" Für %s liegt noch kein Regelkorpus vor — diese Aspekte werden nicht bewertet (future_corpus_needed)."
% ", ".join(future_domains)
)
if not in_scope and not future_domains:
base += " Diese Auslegung betrifft kein Regelwerk Ihrer aktuellen Produkt-Map."
return base
def interpret_in_map(reg_map: RegulatoryMap, interpretation: str) -> InterpretationInMapResult:
a = assess_interpretation(interpretation) # existing engine — no new reasoning
map_reg_ids = (
{v.regulation_id for v in reg_map.applicable_regulations}
| {v.regulation_id for v in reg_map.uncertain_regulations}
| {v.regulation_id for v in reg_map.excluded_regulations}
)
map_ob_ids = {o.obligation_id for v in reg_map.applicable_regulations for o in v.obligations}
uncertain_ids = {v.regulation_id for v in reg_map.uncertain_regulations}
affected_regs = [r for r in a.affected_regulations if r in map_reg_ids]
affected_obs = [o for o in a.affected_obligations if o in map_ob_ids]
related_unc = [r for r in a.affected_regulations if r in uncertain_ids]
future = [d for d in reg_map.unsupported_domains if _touches(interpretation, d.domain)]
in_scope = bool(affected_regs or affected_obs)
return InterpretationInMapResult(
raw_interpretation=interpretation,
assessment=a.assessment,
in_scope_of_map=in_scope,
affected_regulations=affected_regs,
affected_obligations=affected_obs,
related_uncertainties=related_unc,
future_corpus_domains=future,
corrected_interpretation=a.corrected_interpretation,
risks=a.risks,
legal_basis_refs=a.legal_basis_refs,
explanation=_explain(_LABEL[a.assessment], a.explanation, affected_regs, [d.domain for d in future], in_scope),
confidence=a.confidence,
)
@@ -0,0 +1,36 @@
"""Schemas for Interpretation-in-Map (step 5).
A thin adapter that evaluates a customer interpretation WITHIN the already-built
RegulatoryMap — it does not assess abstract legal questions. Application types
only; no compliance-meta-model classes (freeze v1.0 untouched).
"""
from __future__ import annotations
from typing import List
from pydantic import BaseModel, Field
from compliance.product_scope.schemas import UnsupportedDomain
from compliance.profile.canonical import CanonicalProductRegulatoryProfile
from compliance.reasoning.enums import Confidence, InterpretationVerdict
class InterpretationInMapRequest(BaseModel):
product_profile: CanonicalProductRegulatoryProfile
customer_interpretation: str
class InterpretationInMapResult(BaseModel):
raw_interpretation: str
assessment: InterpretationVerdict
in_scope_of_map: bool # True if it touches a regulation/obligation present in the map
affected_regulations: List[str] = Field(default_factory=list) # intersected with the map
affected_obligations: List[str] = Field(default_factory=list) # intersected (registry-linked)
related_uncertainties: List[str] = Field(default_factory=list) # map-uncertain regs it touches
future_corpus_domains: List[UnsupportedDomain] = Field(default_factory=list) # NOT evaluated
corrected_interpretation: str = ""
risks: List[str] = Field(default_factory=list)
legal_basis_refs: List[str] = Field(default_factory=list)
explanation: str = ""
confidence: Confidence = Confidence.MEDIUM
@@ -0,0 +1,29 @@
"""Product Regulatory Navigator — thin missing-facts layer.
Sits above the CanonicalProductRegulatoryProfile (prefilled from company-profile /
ProductWizard) and reports only which facts are still missing + prioritized
questions to collect them. It decides which facts are needed, NOT what regulation
applies — that stays with the Scope Engine (step 3). No regulation logic, no UI,
no Go, no RAG.
"""
from __future__ import annotations
from .engine import CompletenessSummary, NavigatorResult, apply_answers, navigate
from .questions import (
QUESTION_CATALOG,
AnswerType,
NavigatorQuestion,
QuestionPriority,
)
__all__ = [
"navigate",
"apply_answers",
"NavigatorResult",
"CompletenessSummary",
"NavigatorQuestion",
"AnswerType",
"QuestionPriority",
"QUESTION_CATALOG",
]
@@ -0,0 +1,116 @@
"""Product Regulatory Navigator engine — missing-facts only.
`navigate(profile)` reports which canonical fields are still unknown and the
prioritized questions to fill them. `apply_answers(profile, answers)` returns the
updated profile. It NEVER decides what applies — that is the Scope Engine (step 3).
Pure field-presence checking; no scope-engine import, no regulation evaluation.
"""
from __future__ import annotations
from typing import Any, Dict, List, Type
from pydantic import BaseModel, Field
from compliance.profile.canonical import (
CanonicalLifecyclePhase,
CanonicalProductRegulatoryProfile,
EconomicOperatorRole,
ProductComponent,
)
from .questions import QUESTION_CATALOG, NavigatorQuestion, QuestionPriority
_ENUM_FIELDS: Dict[str, Type[Any]] = {
"economic_operator_role": EconomicOperatorRole,
"lifecycle_phase": CanonicalLifecyclePhase,
}
class CompletenessSummary(BaseModel):
total_relevant: int
answered: int
missing: int
missing_by_priority: Dict[str, int] = Field(default_factory=dict)
ready_for_scope: bool # True once no P0 fact is missing
note: str = ""
class NavigatorResult(BaseModel):
missing_facts: List[str] = Field(default_factory=list) # canonical target fields
suggested_questions: List[NavigatorQuestion] = Field(default_factory=list)
completeness_summary: CompletenessSummary
def _value(profile: CanonicalProductRegulatoryProfile, dotted: str) -> Any:
if "." in dotted:
head, tail = dotted.split(".", 1)
return getattr(getattr(profile, head), tail, None)
return getattr(profile, dotted, None)
def _is_unknown(profile: CanonicalProductRegulatoryProfile, q: NavigatorQuestion) -> bool:
value = _value(profile, q.target_field)
if value is None:
return True
if isinstance(value, list) and not value:
return True
return False
def navigate(profile: CanonicalProductRegulatoryProfile) -> NavigatorResult:
missing = [q for q in QUESTION_CATALOG if _is_unknown(profile, q)]
missing.sort(key=lambda q: q.order())
by_priority: Dict[str, int] = {}
for q in missing:
by_priority[q.priority.value] = by_priority.get(q.priority.value, 0) + 1
ready = QuestionPriority.P0.value not in by_priority
total = len(QUESTION_CATALOG)
summary = CompletenessSummary(
total_relevant=total,
answered=total - len(missing),
missing=len(missing),
missing_by_priority=by_priority,
ready_for_scope=ready,
note=(
"%d von %d Fakten vorhanden; %d offen. Scope-Engine startklar: %s."
% (total - len(missing), total, len(missing), "ja" if ready else "nein (P0 fehlt)")
),
)
return NavigatorResult(
missing_facts=[q.target_field for q in missing],
suggested_questions=missing,
completeness_summary=summary,
)
def _coerce(q: NavigatorQuestion, value: Any) -> Any:
if q.target_field in _ENUM_FIELDS:
return _ENUM_FIELDS[q.target_field](value)
if q.target_field == "components":
return [c if isinstance(c, ProductComponent) else ProductComponent(**c) for c in (value or [])]
if q.answer_type.value in {"country_list", "multiselect"}:
return list(value or [])
if q.answer_type.value == "bool":
return bool(value)
return value
def apply_answers(
profile: CanonicalProductRegulatoryProfile, answers: Dict[str, Any]
) -> CanonicalProductRegulatoryProfile:
updated = profile.model_copy(deep=True)
by_id = {q.question_id: q for q in QUESTION_CATALOG}
for question_id, raw in answers.items():
q = by_id.get(question_id)
if q is None or raw is None:
continue
value = _coerce(q, raw)
if "." in q.target_field:
head, tail = q.target_field.split(".", 1)
setattr(getattr(updated, head), tail, value)
else:
setattr(updated, q.target_field, value)
return updated
@@ -0,0 +1,171 @@
"""Product Regulatory Navigator — question catalog.
The Navigator is a THIN missing-facts layer over CanonicalProductRegulatoryProfile.
It does NOT decide what applies — `regulatory_domains_unblocked` is static metadata
(which domains a fact would help the Scope Engine decide later), never an
evaluation. No regulation logic, no UI, no Go, no RAG.
`NavigatorQuestion` is an interaction type, NOT a compliance-meta-model class
(architecture freeze v1.0 untouched).
"""
from __future__ import annotations
from enum import Enum
from typing import List
from pydantic import BaseModel, Field
from compliance.profile.canonical import CanonicalLifecyclePhase, EconomicOperatorRole
class AnswerType(str, Enum):
BOOL = "bool"
ENUM = "enum"
MULTISELECT = "multiselect"
TEXT = "text"
COUNTRY_LIST = "country_list"
COMPONENT_LIST = "component_list"
class QuestionPriority(str, Enum):
P0 = "P0" # blocks scope: EU-vs-not, role, lifecycle, machine/component
P1 = "P1" # unblocks a specific domain: RED, Data Act, environment, security
P2 = "P2" # refinement: structured BOM
_PRIORITY_ORDER = {QuestionPriority.P0: 0, QuestionPriority.P1: 1, QuestionPriority.P2: 2}
class NavigatorQuestion(BaseModel):
question_id: str
target_field: str # dotted path into the canonical profile
label: str
why_needed: str
regulatory_domains_unblocked: List[str] = Field(default_factory=list)
answer_type: AnswerType
options: List[str] = Field(default_factory=list)
priority: QuestionPriority
def order(self) -> int:
return _PRIORITY_ORDER[self.priority]
_ROLE_OPTIONS = [e.value for e in EconomicOperatorRole]
_PHASE_OPTIONS = [e.value for e in CanonicalLifecyclePhase]
QUESTION_CATALOG: List[NavigatorQuestion] = [
# ── P0: block the scope decision itself ───────────────────────────
NavigatorQuestion(
question_id="markets",
target_field="markets",
label="In welche Märkte / Länder liefern Sie das Produkt?",
why_needed="Bestimmt EU- vs. Nicht-EU-Anwendbarkeit und nationale Pflichten.",
regulatory_domains_unblocked=["cyber", "machine_safety", "data", "radio", "emv", "environment"],
answer_type=AnswerType.COUNTRY_LIST,
priority=QuestionPriority.P0,
),
NavigatorQuestion(
question_id="economic_operator_role",
target_field="economic_operator_role",
label="Welche Rolle nehmen Sie ein?",
why_needed="Pflichten hängen von der Rolle ab (Hersteller/Importeur/Händler/Betreiber/Service).",
regulatory_domains_unblocked=["cyber", "machine_safety", "data"],
answer_type=AnswerType.ENUM,
options=_ROLE_OPTIONS,
priority=QuestionPriority.P0,
),
NavigatorQuestion(
question_id="lifecycle_phase",
target_field="lifecycle_phase",
label="In welcher Lebenszyklusphase betrachten Sie das Produkt?",
why_needed="Manche Pflichten greifen nur beim Inverkehrbringen oder in der Wartung.",
regulatory_domains_unblocked=["cyber", "machine_safety"],
answer_type=AnswerType.ENUM,
options=_PHASE_OPTIONS,
priority=QuestionPriority.P0,
),
NavigatorQuestion(
question_id="is_machine",
target_field="is_machine",
label="Ist das Produkt eine (vollständige) Maschine?",
why_needed="Entscheidet die Anwendbarkeit der Maschinenverordnung.",
regulatory_domains_unblocked=["machine_safety"],
answer_type=AnswerType.BOOL,
priority=QuestionPriority.P0,
),
NavigatorQuestion(
question_id="is_component",
target_field="is_component",
label="Ist das Produkt ein Bauteil / eine unvollständige Maschine?",
why_needed="Sicherheitsbauteil vs. vollständige Maschine ändert die Pflichten.",
regulatory_domains_unblocked=["machine_safety"],
answer_type=AnswerType.BOOL,
priority=QuestionPriority.P0,
),
# ── P1: unblock one specific domain ───────────────────────────────
NavigatorQuestion(
question_id="has_radio_module",
target_field="has_radio_module",
label="Enthält das Produkt ein Funkmodul (WLAN/Bluetooth/Mobilfunk)?",
why_needed="Ein Funkmodul löst die Funkanlagen-Richtlinie (RED) aus.",
regulatory_domains_unblocked=["radio"],
answer_type=AnswerType.BOOL,
priority=QuestionPriority.P1,
),
NavigatorQuestion(
question_id="generates_usage_data",
target_field="generates_usage_data",
label="Erzeugt das vernetzte Produkt nutzbare Produkt-/Nutzungsdaten?",
why_needed="Erzeugte Nutzungsdaten entscheiden über Data-Act-Pflichten.",
regulatory_domains_unblocked=["data"],
answer_type=AnswerType.BOOL,
priority=QuestionPriority.P1,
),
NavigatorQuestion(
question_id="has_security_function",
target_field="has_security_function",
label="Hat das Produkt eine dedizierte Security-Funktion (gegen böswillige Akteure)?",
why_needed="Trennt Security- von Safety-Funktion (CRA vs. MaschinenVO).",
regulatory_domains_unblocked=["cyber", "machine_safety"],
answer_type=AnswerType.BOOL,
priority=QuestionPriority.P1,
),
NavigatorQuestion(
question_id="env_wastewater",
target_field="environmental.discharges_to_wastewater",
label="Gibt das Produkt Stoffe an Wasser / Abwasser ab?",
why_needed="Abwassereinleitung löst Abwasser-/Gewässerrecht aus.",
regulatory_domains_unblocked=["environment_water"],
answer_type=AnswerType.BOOL,
priority=QuestionPriority.P1,
),
NavigatorQuestion(
question_id="env_air",
target_field="environmental.emits_to_air",
label="Entstehen Luftemissionen (VOC, Staub, Verbrennung, Aerosole)?",
why_needed="Luftemissionen lösen Immissionsschutzrecht aus.",
regulatory_domains_unblocked=["environment_air"],
answer_type=AnswerType.BOOL,
priority=QuestionPriority.P1,
),
NavigatorQuestion(
question_id="env_chemicals",
target_field="environmental.uses_cleaning_chemicals",
label="Werden Reinigungs-, Desinfektions- oder Biozidmittel verwendet/mitgeliefert?",
why_needed="Chemikalien lösen REACH/CLP/Detergenzien-/Biozidrecht aus.",
regulatory_domains_unblocked=["chemicals"],
answer_type=AnswerType.BOOL,
priority=QuestionPriority.P1,
),
# ── P2: refinement ────────────────────────────────────────────────
NavigatorQuestion(
question_id="components",
target_field="components",
label="Aus welchen wesentlichen Komponenten besteht das Produkt?",
why_needed="Eine strukturierte Stückliste verfeinert komponenten-abgeleitete Pflichten.",
regulatory_domains_unblocked=["radio", "emv", "environment_water", "chemicals"],
answer_type=AnswerType.COMPONENT_LIST,
priority=QuestionPriority.P2,
),
]
@@ -0,0 +1,26 @@
"""Product-scope orchestration (step 3).
Connects the Navigator's fact-gate to the existing reasoning `discover_scope`:
decide regulatory scope only once the minimum (P0) facts are present, otherwise
return the missing facts. Reuses discover_scope unchanged — no new scope logic.
"""
from __future__ import annotations
from .orchestrator import resolve_product_scope
from .schemas import (
ProductScopeRequest,
ProductScopeResponse,
RegulatoryScopeResult,
ScopeStatus,
UnsupportedDomain,
)
__all__ = [
"resolve_product_scope",
"ProductScopeRequest",
"ProductScopeResponse",
"RegulatoryScopeResult",
"UnsupportedDomain",
"ScopeStatus",
]
@@ -0,0 +1,77 @@
"""Product-scope orchestrator (step 3) — gate, then reuse discover_scope.
THE rule: the Scope Engine decides only once the Navigator has released the
minimum facts. If P0 facts are missing, return the missing facts/questions and do
NOT run discover_scope. Otherwise project the canonical into the reasoning profile
and run the EXISTING `discover_scope` exactly once.
No new scope rules, no new regulations, no environmental-law evaluation (those
domains are surfaced only as unsupported_domains / future_corpus_needed).
"""
from __future__ import annotations
from typing import List, Tuple
from compliance.navigator.engine import navigate
from compliance.profile.canonical import CanonicalProductRegulatoryProfile
from compliance.profile.to_reasoning import to_reasoning_profile
from compliance.reasoning.scope_engine import discover_scope
from .schemas import (
ProductScopeResponse,
RegulatoryScopeResult,
ScopeStatus,
UnsupportedDomain,
)
# environmental trigger field -> (domain, note). Transparency only — not a verdict.
_ENV_DOMAINS: List[Tuple[str, str, str]] = [
("discharges_to_wastewater", "environment_water", "Abwasser-/Gewässerrecht (z. B. AbwV, WRRL) — noch nicht im Korpus."),
("has_cooling_or_spraying_water", "environment_water", "Wasserbezogene Anforderungen — noch nicht im Korpus."),
("emits_to_air", "environment_air", "Immissionsschutz-/Luftreinhalterecht (z. B. BImSchG, IED) — noch nicht im Korpus."),
("uses_solvents", "environment_air", "Lösemittel-/VOC-Recht (z. B. 31. BImSchV) — noch nicht im Korpus."),
("uses_cleaning_chemicals", "chemicals", "Chemikalienrecht (REACH/CLP/Detergenzien/Biozide) — noch nicht im Korpus."),
("supplies_chemicals", "chemicals", "Chemikalienrecht (REACH/CLP) — noch nicht im Korpus."),
("contains_restricted_substances", "chemicals", "Stoffbeschränkungen (REACH/RoHS) — noch nicht im Korpus."),
("creates_waste", "waste", "Abfall-/Entsorgungsrecht (u. a. WEEE) — noch nicht im Korpus."),
("consumes_energy_or_water", "energy_resources", "Energie-/Ökodesign-Recht — noch nicht im Korpus."),
]
def _unsupported_domains(profile: CanonicalProductRegulatoryProfile) -> List[UnsupportedDomain]:
env = profile.environmental
seen = set()
out: List[UnsupportedDomain] = []
for field, domain, note in _ENV_DOMAINS:
if getattr(env, field) is True and domain not in seen:
seen.add(domain)
out.append(UnsupportedDomain(domain=domain, trigger=field, note=note))
return out
def resolve_product_scope(profile: CanonicalProductRegulatoryProfile) -> ProductScopeResponse:
nav = navigate(profile)
if not nav.completeness_summary.ready_for_scope:
return ProductScopeResponse(
status=ScopeStatus.NEEDS_FACTS,
completeness_summary=nav.completeness_summary,
missing_facts=nav.missing_facts,
suggested_questions=nav.suggested_questions,
)
scope = discover_scope(to_reasoning_profile(profile)) # exactly once
result = RegulatoryScopeResult(
applicable_regulations=scope.applicable_regulations,
excluded_regulations=scope.excluded_regulations,
uncertain_regulations=scope.uncertain_regulations,
unsupported_domains=_unsupported_domains(profile),
reasoning_summary=scope.reasoning_summary,
confidence=scope.confidence,
)
return ProductScopeResponse(
status=ScopeStatus.RESOLVED,
completeness_summary=nav.completeness_summary,
regulatory_scope=result,
)
@@ -0,0 +1,63 @@
"""Response schemas for the product-scope orchestrator (step 3).
These are application/API types — NOT compliance-meta-model classes (architecture
freeze v1.0 untouched). The scope verdict itself is produced by the existing
`discover_scope`; nothing here adds scope rules.
"""
from __future__ import annotations
from enum import Enum
from typing import List, Optional
from pydantic import BaseModel, Field
from compliance.navigator.engine import CompletenessSummary
from compliance.navigator.questions import NavigatorQuestion
from compliance.profile.canonical import CanonicalProductRegulatoryProfile
from compliance.reasoning.enums import Confidence
from compliance.reasoning.schemas import (
ApplicableRegulation,
ExcludedRegulation,
UncertainRegulation,
)
class ScopeStatus(str, Enum):
NEEDS_FACTS = "needs_facts" # P0 facts missing -> ask, do not decide
RESOLVED = "resolved" # minimum facts present -> scope decided
class UnsupportedDomain(BaseModel):
"""A domain the product triggers but the corpus does not yet cover.
Surfaced for transparency (no false completeness) — NEVER a legal evaluation.
"""
domain: str
trigger: str
status: str = "future_corpus_needed"
note: str = ""
class RegulatoryScopeResult(BaseModel):
applicable_regulations: List[ApplicableRegulation] = Field(default_factory=list)
excluded_regulations: List[ExcludedRegulation] = Field(default_factory=list)
uncertain_regulations: List[UncertainRegulation] = Field(default_factory=list)
unsupported_domains: List[UnsupportedDomain] = Field(default_factory=list)
reasoning_summary: str = ""
confidence: Confidence = Confidence.MEDIUM
class ProductScopeRequest(BaseModel):
product_profile: CanonicalProductRegulatoryProfile
class ProductScopeResponse(BaseModel):
status: ScopeStatus
completeness_summary: CompletenessSummary
# case NEEDS_FACTS
missing_facts: List[str] = Field(default_factory=list)
suggested_questions: List[NavigatorQuestion] = Field(default_factory=list)
# case RESOLVED
regulatory_scope: Optional[RegulatoryScopeResult] = None
@@ -0,0 +1,38 @@
"""Product profile convergence layer.
ONE canonical product profile (`CanonicalProductRegulatoryProfile`) that the Go
gap engine and the Python reasoning engine both project from — so "SPS mit
Remote Access" means the same thing everywhere. gap.ProductProfile leads; the
reasoning ProductProfile is an adapter/DTO. Types + mappers only — no regulation
logic, no UI, no new questions.
"""
from __future__ import annotations
from .canonical import (
CanonicalLifecyclePhase,
CanonicalProductRegulatoryProfile,
CanonicalProductType,
ComponentKind,
EconomicOperatorRole,
EnvironmentalImpact,
ProductComponent,
)
from .from_company_profile import from_company_profile
from .from_product_wizard import from_product_wizard
from .to_gap import to_gap_profile
from .to_reasoning import to_reasoning_profile
__all__ = [
"CanonicalProductRegulatoryProfile",
"CanonicalProductType",
"EconomicOperatorRole",
"CanonicalLifecyclePhase",
"ComponentKind",
"ProductComponent",
"EnvironmentalImpact",
"from_product_wizard",
"from_company_profile",
"to_gap_profile",
"to_reasoning_profile",
]
@@ -0,0 +1,158 @@
"""CanonicalProductRegulatoryProfile — the single semantic product profile.
Convergence layer (spec 2026-06-26): instead of letting the Go `gap.ProductProfile`
and the Python reasoning `ProductProfile` drift, ONE canonical type is the source
of truth. The Go gap engine LEADS (it carries real engine logic), so the canonical
mirrors gap's field names and adds the Navigator gaps the audit found missing
(economic-operator role, radio module, generates_usage_data, lifecycle phase,
structured BOM, safety-vs-security split, machine-vs-component) plus a
forward-looking Environmental-Impact domain.
No regulation logic lives here — types only. Mappers live in sibling modules.
Python 3.9 compatible (no `|` unions).
"""
from __future__ import annotations
from enum import Enum
from typing import List, Optional
from pydantic import BaseModel, Field
class CanonicalProductType(str, Enum): # mirrors gap.ProductType
SOFTWARE = "software"
HARDWARE = "hardware"
IOT = "iot"
SAAS = "saas"
EXCHANGE = "exchange"
MEDICAL_DEVICE = "medical_device"
MACHINERY = "machinery"
OTHER = "other"
class EconomicOperatorRole(str, Enum): # CE/CRA role — gap.ProductProfile has none
MANUFACTURER = "manufacturer"
IMPORTER = "importer"
DISTRIBUTOR = "distributor"
INTEGRATOR = "integrator"
OPERATOR = "operator"
SERVICE_PROVIDER = "service_provider"
class CanonicalLifecyclePhase(str, Enum):
DEVELOPMENT = "development"
PLACING_ON_MARKET = "placing_on_market"
OPERATION = "operation"
MAINTENANCE = "maintenance"
UPDATE = "update"
END_OF_LIFE = "end_of_life"
class ComponentKind(str, Enum):
MOTOR = "motor"
PUMP = "pump"
HEATING = "heating"
COOLING = "cooling"
CONTROLLER = "controller"
PLC = "plc"
HMI = "hmi"
SENSOR = "sensor"
ACTUATOR = "actuator"
CAMERA = "camera"
NETWORK_INTERFACE = "network_interface"
RADIO_MODULE = "radio_module"
CHEMICAL_DOSING = "chemical_dosing"
WATER_INLET = "water_inlet"
WASTEWATER_OUTLET = "wastewater_outlet"
BATTERY = "battery"
OTHER = "other"
class ProductComponent(BaseModel):
"""One structured BOM node — these nodes are what later trigger domains."""
name: str
kind: ComponentKind = ComponentKind.OTHER
notes: Optional[str] = None
class EnvironmentalImpact(BaseModel):
"""Forward-looking Umweltmedien-Trigger (own Navigator domain).
No regulation logic consumes these yet — profile fields only, so the model
is not blind to wastewater/air/chemicals/waste questions when that domain
is wired later (AbwV/WRRL/REACH/CLP/IED/BImSchG ...).
"""
discharges_to_wastewater: Optional[bool] = None
uses_cleaning_chemicals: Optional[bool] = None
supplies_chemicals: Optional[bool] = None
emits_to_air: Optional[bool] = None
uses_solvents: Optional[bool] = None
creates_waste: Optional[bool] = None
contains_restricted_substances: Optional[bool] = None
consumes_energy_or_water: Optional[bool] = None
has_cooling_or_spraying_water: Optional[bool] = None
class CanonicalProductRegulatoryProfile(BaseModel):
# --- identity ---
name: str = ""
description: str = ""
product_type: Optional[CanonicalProductType] = None
product_profile_id: Optional[str] = None
tenant_id: Optional[str] = None
iace_project_id: Optional[str] = None
# --- gap-native lists ---
technologies: List[str] = Field(default_factory=list)
data_processing: List[str] = Field(default_factory=list)
markets: List[str] = Field(default_factory=list) # real list — never hardcoded ['EU']
existing_certifications: List[str] = Field(default_factory=list)
applied_norms: List[str] = Field(default_factory=list)
# --- gap-native product / IST-state booleans (tri-state: None = unknown) ---
connected_to_internet: Optional[bool] = None
has_software_updates: Optional[bool] = None
uses_ai: Optional[bool] = None
processes_personal_data: Optional[bool] = None
is_critical_infra_supplier: Optional[bool] = None
has_risk_assessment: Optional[bool] = None
has_technical_file: Optional[bool] = None
has_operating_manual: Optional[bool] = None
has_sbom: Optional[bool] = None
has_vuln_management: Optional[bool] = None
has_update_mechanism: Optional[bool] = None
has_incident_response: Optional[bool] = None
has_supply_chain_mgmt: Optional[bool] = None
ce_marking_since: Optional[str] = None
product_age: Optional[str] = None
# --- NEW Navigator-gap fields (audit 2026-06-26) ---
economic_operator_role: Optional[EconomicOperatorRole] = None
has_radio_module: Optional[bool] = None
generates_usage_data: Optional[bool] = None
lifecycle_phase: Optional[CanonicalLifecyclePhase] = None
components: List[ProductComponent] = Field(default_factory=list)
has_safety_function: Optional[bool] = None
safety_function_description: Optional[str] = None
has_security_function: Optional[bool] = None # safety vs security split
has_remote_access: Optional[bool] = None
has_embedded_software: Optional[bool] = None
is_machine: Optional[bool] = None
is_component: Optional[bool] = None
is_spare_part: Optional[bool] = None
# --- company / market context (NIS2 + scope; from company-profile) ---
b2b_or_b2c: Optional[str] = None
sector_industry: Optional[str] = None
company_size: Optional[str] = None
primary_jurisdiction: Optional[str] = None
# --- AI context (classification stays delegated to ai-act/ucca) ---
ai_integration_type: List[str] = Field(default_factory=list)
human_oversight_level: Optional[str] = None
# --- forward-looking environmental domain ---
environmental: EnvironmentalImpact = Field(default_factory=EnvironmentalImpact)
@@ -0,0 +1,59 @@
"""company-profile -> CanonicalProductRegulatoryProfile (prefill, acceptance #2).
Pulls master data (industry, business model, size, markets) and the conditional
`machine_builder` block (camelCase JSONB keys, defined frontend-side) so the user
re-answers nothing. The machineBuilder block is the richest product/safety/
connectivity source — note it is industry-gated in the UI, so a prefill may find
it empty; that is fine (fields stay None = unknown).
"""
from __future__ import annotations
from typing import Any, Dict, List
from .canonical import CanonicalProductRegulatoryProfile
_EU_MEMBER_HINTS = {"DE", "AT", "FR", "IT", "NL", "LU", "LI", "EU", "EWR", "EEA", "DACH"}
def _markets(p: Dict[str, Any], mb: Dict[str, Any]) -> List[str]:
out: List[str] = []
for source in (p.get("target_markets"), mb.get("exportMarkets"), [p.get("primary_jurisdiction")], [p.get("headquarters_country")]):
for m in source or []:
if m and m not in out:
out.append(m)
return out
def _is_machine(mb: Dict[str, Any]) -> Any:
types = mb.get("productTypes")
if types:
return True
return None
def from_company_profile(profile: Dict[str, Any]) -> CanonicalProductRegulatoryProfile:
p = profile
mb = p.get("machine_builder") or {}
contains_ai = mb.get("containsAI")
uses_ai = contains_ai if contains_ai is not None else p.get("uses_ai")
return CanonicalProductRegulatoryProfile(
description=mb.get("productDescription") or "",
sector_industry=p.get("industry") or None,
b2b_or_b2c=p.get("business_model") or None,
company_size=p.get("company_size") or None,
primary_jurisdiction=p.get("primary_jurisdiction") or None,
markets=_markets(p, mb),
uses_ai=uses_ai,
ai_integration_type=list(mb.get("aiIntegrationType") or []),
human_oversight_level=mb.get("humanOversightLevel") or None,
has_embedded_software=mb.get("containsFirmware"),
has_safety_function=mb.get("hasSafetyFunction"),
safety_function_description=mb.get("safetyFunctionDescription") or None,
has_remote_access=mb.get("hasRemoteAccess"),
connected_to_internet=mb.get("isNetworked"),
has_software_updates=mb.get("hasOTAUpdates"),
has_risk_assessment=mb.get("hasRiskAssessment"),
is_machine=_is_machine(mb),
is_critical_infra_supplier=mb.get("criticalSectorClients"),
)
@@ -0,0 +1,50 @@
"""ProductWizard payload -> CanonicalProductRegulatoryProfile (lossless).
The gap-analysis ProductWizard POSTs exactly the gap.ProductProfile JSON shape
(see admin-compliance/.../ProductWizard.tsx handleSubmit). This mapper copies
every gap field verbatim so that `to_gap_profile(from_product_wizard(p))`
reproduces the gap subset of `p` byte-for-byte (acceptance #1). New Navigator
fields the wizard does not ask stay None.
"""
from __future__ import annotations
from typing import Any, Dict, Optional
from .canonical import CanonicalProductRegulatoryProfile, CanonicalProductType
def _as_product_type(value: Any) -> Optional[CanonicalProductType]:
try:
return CanonicalProductType(value)
except ValueError:
return None
def from_product_wizard(payload: Dict[str, Any]) -> CanonicalProductRegulatoryProfile:
g = payload.get
return CanonicalProductRegulatoryProfile(
name=g("name", ""),
description=g("description", ""),
product_type=_as_product_type(g("product_type")),
technologies=list(g("technologies") or []),
data_processing=list(g("data_processing") or []),
markets=list(g("markets") or []),
existing_certifications=list(g("existing_certifications") or []),
applied_norms=list(g("applied_norms") or []),
connected_to_internet=g("connected_to_internet"),
has_software_updates=g("has_software_updates"),
uses_ai=g("uses_ai"),
processes_personal_data=g("processes_personal_data"),
is_critical_infra_supplier=g("is_critical_infra_supplier"),
has_risk_assessment=g("has_risk_assessment"),
has_technical_file=g("has_technical_file"),
has_operating_manual=g("has_operating_manual"),
has_sbom=g("has_sbom"),
has_vuln_management=g("has_vuln_management"),
has_update_mechanism=g("has_update_mechanism"),
has_incident_response=g("has_incident_response"),
has_supply_chain_mgmt=g("has_supply_chain_mgmt"),
ce_marking_since=g("ce_marking_since"),
product_age=g("product_age"),
)
@@ -0,0 +1,41 @@
"""CanonicalProductRegulatoryProfile -> gap.ProductProfile JSON shape.
Emits exactly the keys the Go gap engine already consumes (gap/models.go json
tags), so the gap engine runs UNCHANGED — the canonical is a superset and gap is
its lossless projection. Canonical-only fields (role/radio/components/...) are
intentionally not emitted here; they reach the reasoning side via to_reasoning.
"""
from __future__ import annotations
from typing import Any, Dict
from .canonical import CanonicalProductRegulatoryProfile
def to_gap_profile(c: CanonicalProductRegulatoryProfile) -> Dict[str, Any]:
return {
"name": c.name,
"description": c.description,
"product_type": c.product_type.value if c.product_type else "",
"technologies": list(c.technologies),
"data_processing": list(c.data_processing),
"markets": list(c.markets),
"existing_certifications": list(c.existing_certifications),
"applied_norms": list(c.applied_norms),
"connected_to_internet": bool(c.connected_to_internet),
"has_software_updates": bool(c.has_software_updates),
"uses_ai": bool(c.uses_ai),
"processes_personal_data": bool(c.processes_personal_data),
"is_critical_infra_supplier": bool(c.is_critical_infra_supplier),
"has_risk_assessment": bool(c.has_risk_assessment),
"has_technical_file": bool(c.has_technical_file),
"has_operating_manual": bool(c.has_operating_manual),
"has_sbom": bool(c.has_sbom),
"has_vuln_management": bool(c.has_vuln_management),
"has_update_mechanism": bool(c.has_update_mechanism),
"has_incident_response": bool(c.has_incident_response),
"has_supply_chain_mgmt": bool(c.has_supply_chain_mgmt),
"ce_marking_since": c.ce_marking_since if c.ce_marking_since is not None else "",
"product_age": c.product_age if c.product_age is not None else "",
}
@@ -0,0 +1,88 @@
"""CanonicalProductRegulatoryProfile -> reasoning ProductProfile (adapter/DTO).
The reasoning engine stays the consumer, never the source of truth (spec): the
canonical leads, this projects it into the Python reasoning ProductProfile so the
Reasoning engine and the Go gap engine run off ONE semantic profile (acceptance
#10). AI classification is NOT done here — only `uses_ai` is forwarded; risk
classification stays delegated to ai-act/ucca (acceptance #3).
This is the ONLY one-way coupling profile -> reasoning; reasoning never imports
profile, so the reasoning layer stays hermetic.
"""
from __future__ import annotations
from typing import List, Optional
from compliance.reasoning.enums import ManufacturerRole, MarketModel, ProductLifecyclePhase
from compliance.reasoning.schemas import ProductProfile
from .canonical import CanonicalProductRegulatoryProfile, CanonicalProductType
_SOFTWARE_TYPES = {CanonicalProductType.SOFTWARE, CanonicalProductType.SAAS, CanonicalProductType.IOT}
_SOFTWARE_TECH = {"ai", "api", "database", "encryption", "ota_updates", "cloud", "blockchain"}
_EU_HINTS = {"DE", "AT", "FR", "IT", "NL", "LU", "LI", "EU", "EWR", "EEA", "DACH"}
_B2X = {"B2B": MarketModel.B2B, "B2C": MarketModel.B2C, "B2B_B2C": MarketModel.BOTH, "B2B2C": MarketModel.BOTH}
def _or_none(*values: Optional[bool]) -> Optional[bool]:
"""True if any value is truthy; None if all are None/absent; else False."""
if any(v is True for v in values):
return True
if all(v is None for v in values):
return None
return False
def _has_software(c: CanonicalProductRegulatoryProfile) -> Optional[bool]:
type_sig = True if c.product_type in _SOFTWARE_TYPES else None
tech_sig = True if (set(c.technologies) & _SOFTWARE_TECH) else None
return _or_none(c.has_embedded_software, c.has_software_updates, c.uses_ai, type_sig, tech_sig)
def _eu_market(markets: List[str]) -> Optional[bool]:
if not markets:
return None
return True if (set(markets) & _EU_HINTS) else False
def _has_radio(c: CanonicalProductRegulatoryProfile) -> Optional[bool]:
if c.has_radio_module is not None:
return c.has_radio_module
if any(comp.kind.value == "radio_module" for comp in c.components):
return True
return None
def to_reasoning_profile(c: CanonicalProductRegulatoryProfile) -> ProductProfile:
role = ManufacturerRole(c.economic_operator_role.value) if c.economic_operator_role else None
phase = ProductLifecyclePhase(c.lifecycle_phase.value) if c.lifecycle_phase else None
b2x = _B2X.get(c.b2b_or_b2c) if c.b2b_or_b2c else None
is_machine = c.is_machine if c.is_machine is not None else (
True if c.product_type == CanonicalProductType.MACHINERY else None
)
generates_data = c.generates_usage_data if c.generates_usage_data is not None else (
True if "telemetry" in c.data_processing else None
)
return ProductProfile(
product_name=c.name or "Produkt",
product_profile_id=c.product_profile_id,
manufacturer_role=role,
product_type=[c.product_type.value] if c.product_type else [],
has_software=_has_software(c),
has_embedded_software=c.has_embedded_software,
has_remote_access=c.has_remote_access,
has_cloud_connection=True if "cloud" in c.technologies else None,
has_ai_functionality=c.uses_ai,
has_radio_module=_has_radio(c),
has_safety_function=c.has_safety_function,
generates_usage_data=generates_data,
is_machine=is_machine,
is_component=c.is_component,
is_spare_part=c.is_spare_part,
eu_market=_eu_market(c.markets),
b2b_or_b2c=b2x,
lifecycle_phase=phase,
company_size=c.company_size,
sector=c.sector_industry,
)
@@ -0,0 +1,27 @@
"""Regulatory Reasoning Engine.
A deterministic reasoning layer ON TOP of the Legal Knowledge Graph (obligation
registry) and the Compliance Execution Graph (control mapping / evidence). It
answers, for a concrete product: which regulations apply, which obligations
follow, whether the customer's implementation covers them, and whether a
customer interpretation is legally sound.
No new RAG, no new controls, no DB schema changes — scope & reasoning metamodel
only (spec §14).
"""
from __future__ import annotations
from .claim_normalizer import normalize_claim
from .implementation_engine import reason_implementation_claim
from .interpretation_engine import assess_interpretation
from .obligation_engine import derive_obligations
from .scope_engine import discover_scope
__all__ = [
"discover_scope",
"derive_obligations",
"normalize_claim",
"reason_implementation_claim",
"assess_interpretation",
]
@@ -0,0 +1,45 @@
"""Customer implementation claim normaliser (spec §4.6).
Turns a free-text statement ("Wir haben einen Update-Prozess.") into structured
capabilities + related topics + weakness qualifiers. Deterministic substring
matching — the claim_id is a stable hash so the same statement always maps to
the same id (no randomness, replay-safe).
"""
from __future__ import annotations
import hashlib
from typing import List, Optional
from .schemas import CustomerImplementationClaim
from .taxonomy_claims import match_capabilities, match_qualifiers, topics_for
def _claim_id(raw_statement: str) -> str:
digest = hashlib.sha1(raw_statement.strip().lower().encode("utf-8")).hexdigest()
return "claim_%s" % digest[:10]
def _normalized(capabilities: List[str], qualifiers: List[str]) -> str:
if not capabilities:
return "Keine bekannte Compliance-Fähigkeit aus der Aussage ableitbar."
text = "Fähigkeiten: " + ", ".join(capabilities)
if qualifiers:
text += " | Einschränkungen: " + ", ".join(qualifiers)
return text
def normalize_claim(
raw_statement: str, claim_id: Optional[str] = None, evidence_refs: Optional[List[str]] = None
) -> CustomerImplementationClaim:
capabilities = match_capabilities(raw_statement)
qualifiers = match_qualifiers(raw_statement)
return CustomerImplementationClaim(
claim_id=claim_id or _claim_id(raw_statement),
raw_statement=raw_statement,
normalized_claim=_normalized(capabilities, qualifiers),
claimed_capability=capabilities,
related_topics=topics_for(capabilities),
qualifiers=qualifiers,
evidence_refs=evidence_refs or [],
)
@@ -0,0 +1,92 @@
"""Enumerations for the Regulatory Reasoning Engine.
Kept dependency-free and Python 3.9 compatible (str-Enums, no `|` unions).
The reasoning layer sits ON TOP of the Legal Knowledge Graph (obligation
registry) and the Compliance Execution Graph (control mapping / evidence).
See memory `project_compliance_graph.md` for the cross-session contract.
"""
from __future__ import annotations
from enum import Enum
class ManufacturerRole(str, Enum):
MANUFACTURER = "manufacturer"
IMPORTER = "importer"
DISTRIBUTOR = "distributor"
INTEGRATOR = "integrator"
OPERATOR = "operator"
SERVICE_PROVIDER = "service_provider"
class ProductLifecyclePhase(str, Enum):
DEVELOPMENT = "development"
PLACING_ON_MARKET = "placing_on_market"
OPERATION = "operation"
MAINTENANCE = "maintenance"
UPDATE = "update"
END_OF_LIFE = "end_of_life"
class MarketModel(str, Enum):
B2B = "b2b"
B2C = "b2c"
BOTH = "both"
class ApplicabilityStatus(str, Enum):
APPLICABLE = "applicable"
PARTIALLY_APPLICABLE = "partially_applicable"
UNCERTAIN = "uncertain"
NOT_APPLICABLE = "not_applicable"
class Confidence(str, Enum):
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
class AuthorityLevel(str, Enum):
"""How binding a statement is — answers MUST visibly separate these."""
LEGAL_TEXT = "legal_text"
RECITAL = "recital"
GUIDANCE = "guidance"
HARMONIZED_STANDARD = "harmonized_standard"
TECHNICAL_STANDARD = "technical_standard"
BEST_PRACTICE = "best_practice"
INTERNAL_INTERPRETATION = "internal_interpretation"
class OverlapType(str, Enum):
IDENTICAL = "identical"
SIMILAR = "similar"
COMPLEMENTARY = "complementary"
CONFLICTING = "conflicting"
DIFFERENT_SCOPE = "different_scope"
class ClaimCoverage(str, Enum):
"""How a customer's *claim* relates to an obligation — Welt 1 (reasoning).
This is NOT a conformity verdict. It judges only the customer's statement,
never whether the obligation is actually met. The real compliance verdict
(erfüllt/offen/unklar from verified evidence) is `ComplianceStatus`, owned by
the Compliance Execution Graph — the two must never be conflated.
"""
POTENTIALLY_ADDRESSES = "potentially_addresses"
PARTIALLY_ADDRESSES = "partially_addresses"
DOES_NOT_ADDRESS = "does_not_address"
INSUFFICIENT_INFORMATION = "insufficient_information"
class InterpretationVerdict(str, Enum):
PLAUSIBLE = "plausible"
TOO_NARROW = "too_narrow"
TOO_BROAD = "too_broad"
PARTIALLY_CORRECT = "partially_correct"
UNSUPPORTED = "unsupported"
UNCERTAIN = "uncertain"
@@ -0,0 +1,158 @@
"""Implementation reasoning (spec Modus 3) — Welt 1 only.
Maps a free-text claim ("Wir haben SBOMs und machen Updates, wenn Kunden Fehler
melden.") onto the product's applicable obligations and reports, per obligation,
whether the *claim* potentially/partially/does-not address it — plus the
evidence that WOULD be needed to prove real implementation.
This is NOT a conformity verdict. It judges the customer's statement, never
whether the obligation is met. The real verdict (ComplianceStatus: erfüllt/
offen/unklar from verified evidence) lives in the Compliance Execution Graph.
The four reasoning layers: claim -> interpretation (capabilities/topics on the
claim) -> potential obligation coverage (`claim_coverage`) -> evidence required.
"""
from __future__ import annotations
from typing import Dict, List
from .claim_normalizer import normalize_claim
from .enums import ClaimCoverage, Confidence
from .obligation_engine import derive_obligations
from .schemas import (
ClaimObligationMapping,
CustomerImplementationClaim,
ImplementationReasoningResponse,
ProductProfile,
)
from .taxonomy_claims import topics_for
DISCLAIMER = (
"Diese Auswertung interpretiert ausschließlich die Kundenaussage (ClaimCoverage, Welt 1). "
"Sie ist KEINE Konformitätsaussage — der tatsächliche Compliance-Status (ComplianceStatus, "
"Welt 2) ergibt sich erst aus geprüften Nachweisen im Compliance Execution Graph."
)
# Typical sub-elements a capability still misses when only partially claimed.
STANDARD_GAPS: Dict[str, List[str]] = {
"software_bill_of_materials": [
"Vulnerability-Monitoring der Komponenten",
"Bewertung betroffener Komponenten",
"Lieferantenprozess",
],
"secure_updates": [
"aktive Schwachstellenüberwachung",
"Patch-Bewertung",
"Fristen und Verantwortlichkeiten",
"Nachweis der Updatefähigkeit",
],
"vulnerability_management": [
"definierter Vulnerability-Handling-Prozess",
"Priorisierung und Fristen",
],
"authentication": ["MFA für privilegierte Zugänge", "keine Standard-Zugangsdaten"],
"security_logging": ["Schutz der Logs vor Manipulation", "Monitoring/Alerting"],
"software_integrity": ["Signierung der Updates", "Verifikation der Update-Signatur"],
"secure_by_default": ["Härtung der Auslieferungskonfiguration", "Minimierung der Angriffsfläche"],
"secure_communication": ["verschlüsselte Übertragung", "Integritätsschutz der Verbindung"],
"risk_assessment": ["dokumentierte Risikobewertung", "Aufnahme in die technische Doku"],
"technical_documentation": ["vollständige technische Unterlagen", "Aktualisierung über den Lebenszyklus"],
}
def _missing_for(capabilities: List[str]) -> List[str]:
out: List[str] = []
for cap in capabilities:
for gap in STANDARD_GAPS.get(cap, []):
if gap not in out:
out.append(gap)
return out
def _coverage(required: List[str], claimed: List[str], qualifiers: List[str]) -> ClaimCoverage:
if not required:
return ClaimCoverage.INSUFFICIENT_INFORMATION
req, have = set(required), set(claimed)
hit = req & have
if not hit:
return ClaimCoverage.DOES_NOT_ADDRESS
if "absent" in qualifiers or "planned" in qualifiers:
return ClaimCoverage.DOES_NOT_ADDRESS
if "reactive" in qualifiers and hit & {"secure_updates", "vulnerability_management"}:
return ClaimCoverage.PARTIALLY_ADDRESSES
if req <= have:
return ClaimCoverage.POTENTIALLY_ADDRESSES
return ClaimCoverage.PARTIALLY_ADDRESSES
def reason_implementation_claim(
profile: ProductProfile, customer_claim: str
) -> ImplementationReasoningResponse:
claim = normalize_claim(customer_claim)
obligations = derive_obligations(profile).applicable_obligations
claimed = claim.claimed_capability
claim_topics = set(claim.related_topics) | set(claimed)
mappings: List[ClaimObligationMapping] = []
missing_evidence: List[str] = []
for ob in obligations:
from .rules_obligations import obligation_rule
rule = obligation_rule(ob.obligation_id)
required_caps = rule.required_capabilities if rule else []
ob_topics = set(topics_for(required_caps)) | set(required_caps)
directly_claimed = bool(set(required_caps) & set(claimed))
related = bool(ob_topics & claim_topics)
if not directly_claimed and not related:
continue # unrelated to the claim -> don't reason about it
coverage = _coverage(required_caps, claimed, claim.qualifiers)
missing = [] if coverage == ClaimCoverage.POTENTIALLY_ADDRESSES else _missing_for(required_caps)
if coverage != ClaimCoverage.POTENTIALLY_ADDRESSES:
for ev in ob.required_evidence:
if ev not in missing_evidence:
missing_evidence.append(ev)
mappings.append(
ClaimObligationMapping(
claim_id=claim.claim_id,
obligation_id=ob.obligation_id,
claim_coverage=coverage,
missing_elements=missing,
required_evidence=ob.required_evidence,
explanation=_explain(coverage, ob.title, claim.qualifiers),
confidence=Confidence.MEDIUM,
)
)
return ImplementationReasoningResponse(
claim=claim,
mappings=mappings,
missing_evidence=missing_evidence,
summary=_summary(claim, mappings),
disclaimer=DISCLAIMER,
)
def _explain(coverage: ClaimCoverage, title: str, qualifiers: List[str]) -> str:
if coverage == ClaimCoverage.POTENTIALLY_ADDRESSES:
return "Die Aussage adressiert die Pflicht '%s' direkt — Nachweise erforderlich für eine Bewertung der Umsetzung." % title
if coverage == ClaimCoverage.PARTIALLY_ADDRESSES:
extra = " Der beschriebene Prozess wirkt reaktiv." if "reactive" in qualifiers else ""
return "Die Aussage adressiert die Pflicht '%s' nur teilweise.%s" % (title, extra)
if coverage == ClaimCoverage.DOES_NOT_ADDRESS:
return "Die Aussage adressiert die Pflicht '%s' nicht." % title
return "Zur Pflicht '%s' liegen zu wenige Angaben für eine Einordnung vor." % title
def _summary(claim: CustomerImplementationClaim, mappings: List[ClaimObligationMapping]) -> str:
if not claim.claimed_capability:
return "Die Aussage ist zu unspezifisch — bitte konkretisieren, was umgesetzt wurde."
full = sum(1 for m in mappings if m.claim_coverage == ClaimCoverage.POTENTIALLY_ADDRESSES)
partial = sum(1 for m in mappings if m.claim_coverage == ClaimCoverage.PARTIALLY_ADDRESSES)
none = sum(1 for m in mappings if m.claim_coverage == ClaimCoverage.DOES_NOT_ADDRESS)
return (
"Die beschriebene Maßnahme adressiert wahrscheinlich %d Pflicht(en) direkt und %d "
"teilweise; %d werden durch die Aussage nicht berührt. Für eine Bewertung der tatsächlichen "
"Umsetzung sind Nachweise erforderlich. Dies ist keine Konformitätsaussage." % (full, partial, none)
)
@@ -0,0 +1,65 @@
"""Interpretation review engine (spec Modus 4).
Evaluates whether a customer's legal interpretation is plausible, too narrow,
too broad, etc. Matches the interpretation against a curated pattern library;
no match -> `uncertain` plus a request for the missing context (never invent a
verdict, spec §6.3).
"""
from __future__ import annotations
import hashlib
from typing import Optional
from .enums import Confidence, InterpretationVerdict
from .schemas import InterpretationAssessment, ProductProfile
from .taxonomy_interpretations import INTERPRETATION_PATTERNS, InterpretationPattern
def _interpretation_id(raw: str) -> str:
digest = hashlib.sha1(raw.strip().lower().encode("utf-8")).hexdigest()
return "interp_%s" % digest[:10]
def _best_match(text: str) -> Optional[InterpretationPattern]:
low = text.lower()
best: Optional[InterpretationPattern] = None
best_score = 0
for pattern in INTERPRETATION_PATTERNS:
score = sum(1 for t in pattern.triggers if t in low)
if score > best_score:
best, best_score = pattern, score
return best
def assess_interpretation(
raw_interpretation: str, profile: Optional[ProductProfile] = None
) -> InterpretationAssessment:
interp_id = _interpretation_id(raw_interpretation)
pattern = _best_match(raw_interpretation)
if pattern is None:
return InterpretationAssessment(
interpretation_id=interp_id,
raw_interpretation=raw_interpretation,
assessment=InterpretationVerdict.UNCERTAIN,
corrected_interpretation=(
"Diese Auslegung lässt sich ohne weitere Angaben nicht bewerten. Bitte Produkt, "
"Rolle, Marktzugang und die konkret betroffene Pflicht benennen."
),
explanation="Kein bekanntes Auslegungsmuster erkannt — bewusst keine Scheinsicherheit.",
confidence=Confidence.LOW,
)
return InterpretationAssessment(
interpretation_id=interp_id,
raw_interpretation=raw_interpretation,
affected_regulations=pattern.affected_regulations,
affected_obligations=pattern.affected_obligations,
assessment=pattern.verdict,
risks=pattern.risks,
corrected_interpretation=pattern.corrected_interpretation,
legal_basis_refs=pattern.legal_basis_refs,
explanation=pattern.explanation,
confidence=pattern.confidence,
)
@@ -0,0 +1,116 @@
"""Applicable-obligation engine (spec Modus 2).
Maps a product profile (optionally a precomputed scope) to the concrete legal
obligations, the overlaps between them, and which evidence types satisfy more
than one obligation at once (the core USP, spec §16).
"""
from __future__ import annotations
from typing import Dict, List, Optional
from .predicates import evaluate, true_leaves
from .rules_obligations import ALL_OBLIGATIONS
from .rules_overlaps import OVERLAP_GROUPS
from .rules_regulations import FIELD_LABELS
from .rules_types import ObligationRule
from .schemas import (
ApplicableObligation,
ObligationOverlap,
ObligationsResponse,
ProductProfile,
RegulatoryScope,
)
from .scope_engine import discover_scope
def _applicable_regulation_ids(profile: ProductProfile, scope: Optional[RegulatoryScope]) -> List[str]:
if scope is None:
scope = discover_scope(profile)
return [r.regulation_id for r in scope.applicable_regulations]
def _applies_because(rule: ObligationRule, profile: ProductProfile) -> List[str]:
labels: List[str] = []
for leaf in true_leaves(rule.applies_if, profile):
label = FIELD_LABELS.get(leaf[0])
if label and label not in labels:
labels.append(label)
if not labels:
labels.append("%s ist für dieses Produkt anwendbar." % rule.source_regulation)
return labels
def _role_ok(rule: ObligationRule, profile: ProductProfile) -> bool:
role = profile.manufacturer_role
if role is None:
return True # unknown role -> do not exclude
return role.value in rule.applies_to_role
def derive_obligations(
profile: ProductProfile, scope: Optional[RegulatoryScope] = None
) -> ObligationsResponse:
active_regs = set(_applicable_regulation_ids(profile, scope))
response = ObligationsResponse()
applied_ids: List[str] = []
for rule in ALL_OBLIGATIONS:
if rule.source_regulation not in active_regs:
continue
if rule.applies_unless is not None and evaluate(rule.applies_unless, profile) is True:
continue
verdict = evaluate(rule.applies_if, profile)
if verdict is not True or not _role_ok(rule, profile):
if verdict is False:
response.excluded_obligations.append(rule.obligation_id)
continue
applied_ids.append(rule.obligation_id)
response.applicable_obligations.append(
ApplicableObligation(
obligation_id=rule.obligation_id,
title=rule.title,
source_regulation=rule.source_regulation,
legal_basis_refs=rule.legal_basis_refs,
obligation_text=rule.obligation_text,
authority_level=rule.authority_level,
applies_because=_applies_because(rule, profile),
applies_to_role=rule.applies_to_role,
lifecycle_phase=rule.lifecycle_phase,
overlap_group_id=rule.overlap_group_id,
required_evidence=rule.required_evidence,
confidence=rule.base_confidence,
registry_anchor=rule.registry_anchor,
proposed=rule.proposed,
)
)
response.overlaps = _overlaps(applied_ids)
response.evidence_for_multiple = _evidence_for_multiple(response.applicable_obligations)
return response
def _overlaps(applied_ids: List[str]) -> List[ObligationOverlap]:
applied = set(applied_ids)
out: List[ObligationOverlap] = []
for group in OVERLAP_GROUPS:
present = [m for m in group.members if m in applied]
if len(present) >= 2:
out.append(
ObligationOverlap(
overlap_group_id=group.overlap_group_id,
obligations=present,
overlap_type=group.overlap_type,
canonical_obligation_id=group.canonical_obligation_id,
explanation=group.explanation,
)
)
return out
def _evidence_for_multiple(obligations: List[ApplicableObligation]) -> Dict[str, List[str]]:
by_evidence: Dict[str, List[str]] = {}
for ob in obligations:
for ev in ob.required_evidence:
by_evidence.setdefault(ev, []).append(ob.obligation_id)
return {ev: ids for ev, ids in by_evidence.items() if len(ids) > 1}
@@ -0,0 +1,100 @@
"""Safe, tri-state condition evaluator for applicability rules.
Conditions are plain data (no `eval`): a *leaf* is a 3-tuple
``(field, op, value)``; a *composite* is ``{"all": [...]}`` or
``{"any": [...]}``. Evaluation is tri-state — ``True`` / ``False`` /
``None`` (unknown) — so a missing product fact yields *uncertain*, never a
false negative.
"""
from __future__ import annotations
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple, Union
Leaf = Tuple[str, str, Any]
Condition = Union[Leaf, Dict[str, Any]]
def _attr(profile: Any, field: str) -> Any:
value = getattr(profile, field, None)
if isinstance(value, Enum):
return value.value
return value
def _eval_leaf(leaf: Leaf, profile: Any) -> Optional[bool]:
field, op, expected = leaf
actual = _attr(profile, field)
if op == "not_none":
return actual is not None
if op == "is_none":
return actual is None
if op == "contains_any":
# list-valued field (e.g. product_type); empty list = known-empty.
items = actual or []
hay = " ".join(str(x).lower() for x in items)
return any(str(k).lower() in hay for k in expected)
if actual is None:
return None # unknown fact -> unknown result
if op == "eq":
return bool(actual == expected)
if op == "ne":
return bool(actual != expected)
if op == "truthy":
return bool(actual)
if op == "falsy":
return not bool(actual)
if op == "in":
return bool(actual in expected)
if op == "not_in":
return bool(actual not in expected)
if op == "date_after":
return bool(actual > expected)
raise ValueError("unknown predicate op: %r" % (op,))
def evaluate(condition: Optional[Condition], profile: Any) -> Optional[bool]:
"""Return True/False/None(unknown) for a condition tree."""
if condition is None:
return True
if isinstance(condition, tuple):
return _eval_leaf(condition, profile)
if "all" in condition:
results = [evaluate(c, profile) for c in condition["all"]]
if any(r is False for r in results):
return False
if any(r is None for r in results):
return None
return True
if "any" in condition:
results = [evaluate(c, profile) for c in condition["any"]]
if any(r is True for r in results):
return True
if any(r is None for r in results):
return None
return False
raise ValueError("malformed condition: %r" % (condition,))
def true_leaves(condition: Optional[Condition], profile: Any) -> List[Leaf]:
"""Collect the leaf conditions that evaluated True (for trigger_facts)."""
if condition is None:
return []
if isinstance(condition, tuple):
return [condition] if _eval_leaf(condition, profile) is True else []
members = condition.get("all") or condition.get("any") or []
out: List[Leaf] = []
for c in members:
out.extend(true_leaves(c, profile))
return out
def unknown_fields(fields: List[str], profile: Any) -> List[str]:
"""Subset of `fields` whose value on the profile is None (unknown)."""
return [f for f in fields if _attr(profile, f) is None]
@@ -0,0 +1,23 @@
"""Aggregated obligation scope rules + lookup helpers."""
from __future__ import annotations
from typing import Dict, List, Optional
from .rules_obligations_cra import CRA_OBLIGATIONS
from .rules_obligations_machine_data import DATA_ACT_OBLIGATIONS, MACHINE_OBLIGATIONS
from .rules_types import ObligationRule
ALL_OBLIGATIONS: List[ObligationRule] = (
CRA_OBLIGATIONS + MACHINE_OBLIGATIONS + DATA_ACT_OBLIGATIONS
)
_BY_ID: Dict[str, ObligationRule] = {o.obligation_id: o for o in ALL_OBLIGATIONS}
def obligation_rule(obligation_id: str) -> Optional[ObligationRule]:
return _BY_ID.get(obligation_id)
def obligations_for_regulation(regulation_id: str) -> List[ObligationRule]:
return [o for o in ALL_OBLIGATIONS if o.source_regulation == regulation_id]
@@ -0,0 +1,271 @@
"""CRA obligation scope rules.
`obligation_id`s in the six CRA-P1 families (sbom/vuln/authentication/logging/
remote_access/updates) are RE-USED verbatim from the Legal-KG registry
(`obligations/obligation_join_keys.json`) — never re-minted (control_uuid trap,
memory `project_compliance_graph.md`). Cross-cutting CRA *process* obligations
(risk assessment, technical documentation, CE, instructions, secure-by-design
umbrella) are not yet in the registry and are flagged `proposed=True`.
"""
from __future__ import annotations
from typing import List
from .enums import AuthorityLevel, Confidence
from .rules_types import ObligationRule
_HAS_SW = ("has_software", "eq", True)
_EU = ("eu_market", "eq", True)
_REMOTE_OR_CLOUD = {"any": [("has_remote_access", "eq", True), ("has_cloud_connection", "eq", True)]}
_LM = AuthorityLevel.LEGAL_TEXT
CRA_OBLIGATIONS: List[ObligationRule] = [
ObligationRule(
obligation_id="sbom_creation",
title="Software Bill of Materials erstellen",
source_regulation="CRA",
obligation_text="Eine SBOM erstellen, die mindestens die obersten Abhängigkeiten des Produkts dokumentiert.",
legal_basis_refs=["CRA Annex I Part II (1)"],
authority_level=_LM,
family="sbom",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["software_bill_of_materials"],
required_evidence=["sbom", "repo_scan"],
lifecycle_phase=["development", "placing_on_market", "maintenance"],
registry_anchor=True,
),
ObligationRule(
obligation_id="provide_security_updates",
title="Sicherheitsupdates bereitstellen",
source_regulation="CRA",
obligation_text="Sicherheitsrelevante Updates zeitnah und über den Supportzeitraum bereitstellen.",
legal_basis_refs=["CRA Annex I (2)(c)", "CRA Art. 13"],
authority_level=_LM,
family="updates",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["secure_updates"],
required_evidence=["policy", "ticket", "test_report"],
lifecycle_phase=["maintenance", "update"],
overlap_group_id="SECURITY_UPDATES",
registry_anchor=True,
),
ObligationRule(
obligation_id="support_period_maintenance",
title="Supportzeitraum definieren und einhalten",
source_regulation="CRA",
obligation_text="Einen angemessenen Supportzeitraum festlegen, in dem Schwachstellen behandelt werden.",
legal_basis_refs=["CRA Art. 13(8)"],
authority_level=_LM,
family="updates",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["secure_updates"],
required_evidence=["policy"],
lifecycle_phase=["placing_on_market", "maintenance", "update"],
registry_anchor=True,
),
ObligationRule(
obligation_id="signed_update_integrity",
title="Integrität von Updates sicherstellen",
source_regulation="CRA",
obligation_text="Updates signieren und ihre Integrität bei der Verteilung verifizieren.",
legal_basis_refs=["CRA Annex I (1)(3)(f)"],
authority_level=_LM,
family="updates",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["software_integrity"],
required_evidence=["config_export", "test_report"],
lifecycle_phase=["development", "maintenance", "update"],
overlap_group_id="SECURITY_UPDATES",
registry_anchor=True,
),
ObligationRule(
obligation_id="vuln_handling_process",
title="Schwachstellenbehandlungs-Prozess",
source_regulation="CRA",
obligation_text="Einen dokumentierten Prozess zur Identifikation, Bewertung und Behebung von Schwachstellen betreiben.",
legal_basis_refs=["CRA Art. 13(8)", "CRA Annex VII"],
authority_level=_LM,
family="vuln",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["vulnerability_management"],
required_evidence=["policy", "ticket"],
lifecycle_phase=["development", "operation", "maintenance"],
overlap_group_id="VULNERABILITY_HANDLING",
registry_anchor=True,
),
ObligationRule(
obligation_id="coordinated_vulnerability_disclosure",
title="Coordinated Vulnerability Disclosure",
source_regulation="CRA",
obligation_text="Eine Richtlinie zur koordinierten Offenlegung von Schwachstellen bereitstellen.",
legal_basis_refs=["CRA Annex I Part II (5)"],
authority_level=_LM,
family="vuln",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["coordinated_disclosure"],
required_evidence=["policy"],
lifecycle_phase=["operation", "maintenance"],
overlap_group_id="VULNERABILITY_HANDLING",
registry_anchor=True,
),
ObligationRule(
obligation_id="exploited_vuln_reporting_authorities",
title="Meldung aktiv ausgenutzter Schwachstellen / Vorfälle",
source_regulation="CRA",
obligation_text="Aktiv ausgenutzte Schwachstellen und schwerwiegende Vorfälle an die zuständigen Behörden melden.",
legal_basis_refs=["CRA Art. 14", "CRA Art. 16"],
authority_level=_LM,
family="vuln",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["incident_reporting"],
required_evidence=["policy", "ticket"],
lifecycle_phase=["operation", "maintenance"],
registry_anchor=True,
),
ObligationRule(
obligation_id="user_authentication_required",
title="Authentifizierung vorsehen",
source_regulation="CRA",
obligation_text="Den Zugang über einen geeigneten Authentifizierungsmechanismus schützen.",
legal_basis_refs=["CRA Annex I (2)(d)"],
authority_level=_LM,
family="authentication",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["authentication"],
required_evidence=["config_export", "pentest"],
lifecycle_phase=["development", "operation"],
registry_anchor=True,
),
ObligationRule(
obligation_id="no_default_credentials",
title="Keine unveränderlichen Standard-Zugangsdaten",
source_regulation="CRA",
obligation_text="Sichere Standardkonfiguration; keine fest hinterlegten oder unveränderlichen Standard-Passwörter.",
legal_basis_refs=["CRA Annex I (2)(a)", "CRA Annex I (2)(b)"],
authority_level=_LM,
family="authentication",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["secure_by_default"],
required_evidence=["config_export", "test_report"],
lifecycle_phase=["development", "placing_on_market"],
registry_anchor=True,
),
ObligationRule(
obligation_id="event_logging_security_events",
title="Sicherheitsrelevante Ereignisse protokollieren",
source_regulation="CRA",
obligation_text="Sicherheitsrelevante Ereignisse und Zugriffe aufzeichnen, um Vorfälle nachvollziehen zu können.",
legal_basis_refs=["CRA Annex I Part I (2)(k)"],
authority_level=_LM,
family="logging",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["security_logging"],
required_evidence=["config_export", "audit_log"],
lifecycle_phase=["operation", "maintenance"],
registry_anchor=True,
),
ObligationRule(
obligation_id="remote_access_attack_surface_min",
title="Angriffsfläche minimieren",
source_regulation="CRA",
obligation_text="Die Angriffsfläche begrenzen, insbesondere exponierte Remote-/Cloud-Schnittstellen.",
legal_basis_refs=["CRA Annex I (1)(2)(a)"],
authority_level=_LM,
family="remote_access",
applies_if={"all": [_REMOTE_OR_CLOUD, _EU]},
required_capabilities=["secure_by_default"],
required_evidence=["config_export", "repo_scan", "pentest"],
lifecycle_phase=["development", "operation"],
registry_anchor=True,
),
ObligationRule(
obligation_id="remote_access_confidentiality_integrity",
title="Vertraulichkeit/Integrität der Fernverbindung",
source_regulation="CRA",
obligation_text="Daten bei Fernzugriff/Cloud-Anbindung verschlüsselt und integritätsgeschützt übertragen.",
legal_basis_refs=["CRA Annex I (1)(2)(b)", "CRA Annex I (1)(2)(c)"],
authority_level=_LM,
family="remote_access",
applies_if={"all": [_REMOTE_OR_CLOUD, _EU]},
required_capabilities=["secure_communication"],
required_evidence=["config_export", "pentest"],
lifecycle_phase=["operation"],
registry_anchor=True,
),
# --- Cross-cutting CRA process obligations (not yet in registry) ---------
ObligationRule(
obligation_id="cra_secure_by_design",
title="Security by Design",
source_regulation="CRA",
obligation_text="Das Produkt so entwerfen, entwickeln und herstellen, dass ein angemessenes Cybersicherheitsniveau gewährleistet ist.",
legal_basis_refs=["CRA Annex I Part I (1)"],
authority_level=_LM,
family="cra_process",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["secure_by_default", "risk_assessment"],
required_evidence=["policy", "test_report"],
lifecycle_phase=["development", "placing_on_market"],
proposed=True,
),
ObligationRule(
obligation_id="cra_risk_assessment",
title="Cybersicherheits-Risikobewertung",
source_regulation="CRA",
obligation_text="Eine Cybersicherheits-Risikobewertung durchführen und dokumentieren; in die technische Dokumentation aufnehmen.",
legal_basis_refs=["CRA Art. 13(2)", "CRA Annex I Part I (1)"],
authority_level=_LM,
family="cra_process",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["risk_assessment"],
required_evidence=["policy"],
lifecycle_phase=["development", "placing_on_market"],
overlap_group_id="RISK_ASSESSMENT",
proposed=True,
),
ObligationRule(
obligation_id="cra_technical_documentation",
title="Technische Dokumentation",
source_regulation="CRA",
obligation_text="Technische Dokumentation erstellen und aktuell halten, die Konformität mit den Anforderungen belegt.",
legal_basis_refs=["CRA Art. 31", "CRA Annex VII"],
authority_level=_LM,
family="cra_process",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["technical_documentation"],
required_evidence=["policy"],
lifecycle_phase=["placing_on_market", "maintenance"],
overlap_group_id="TECHNICAL_DOCUMENTATION",
proposed=True,
),
ObligationRule(
obligation_id="cra_ce_conformity_assessment",
title="Konformitätsbewertung / CE-Kennzeichnung",
source_regulation="CRA",
obligation_text="Vor dem Inverkehrbringen das passende Konformitätsbewertungsverfahren durchlaufen und CE kennzeichnen.",
legal_basis_refs=["CRA Art. 32", "CRA Art. 28"],
authority_level=_LM,
family="cra_process",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["conformity_assessment"],
required_evidence=["test_report", "policy"],
lifecycle_phase=["placing_on_market"],
overlap_group_id="CE_CONFORMITY",
proposed=True,
),
ObligationRule(
obligation_id="cra_instructions_for_use",
title="Informationen und Anweisungen für Nutzer",
source_regulation="CRA",
obligation_text="Nutzern verständliche Sicherheitsinformationen und -anweisungen bereitstellen (z. B. zu Updates und Support-Ende).",
legal_basis_refs=["CRA Annex II"],
authority_level=_LM,
family="cra_process",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["technical_documentation"],
required_evidence=["policy"],
lifecycle_phase=["placing_on_market"],
overlap_group_id="INSTRUCTIONS_FOR_USE",
proposed=True,
),
]
@@ -0,0 +1,139 @@
"""MaschinenVO and Data Act obligation scope rules.
These regulations are NOT yet in the Legal-KG registry (which currently covers
the six CRA-P1 families). Every obligation here is therefore `proposed=True`:
the reasoning layer proposes the snake_case id, the Obligation Registry session
remains the only authority that may canonicalise it (re-link, never re-mint).
"""
from __future__ import annotations
from typing import List
from .enums import AuthorityLevel, Confidence
from .rules_types import ObligationRule
_EU = ("eu_market", "eq", True)
_IS_MACHINE = ("is_machine", "eq", True)
_LM = AuthorityLevel.LEGAL_TEXT
MACHINE_OBLIGATIONS: List[ObligationRule] = [
ObligationRule(
obligation_id="machine_risk_assessment",
title="Maschinen-Risikobeurteilung",
source_regulation="MaschinenVO",
obligation_text="Eine Risikobeurteilung der Maschine durchführen, um Gefährdungen zu ermitteln und zu mindern.",
legal_basis_refs=["MaschinenVO (EU) 2023/1230 Anhang III (1.1.1)", "EN ISO 12100"],
authority_level=_LM,
family="machine_safety",
applies_if={"all": [_IS_MACHINE, _EU]},
required_capabilities=["risk_assessment"],
required_evidence=["policy"],
lifecycle_phase=["development", "placing_on_market"],
overlap_group_id="RISK_ASSESSMENT",
proposed=True,
),
ObligationRule(
obligation_id="machine_safety_control_systems",
title="Sichere Steuerungssysteme",
source_regulation="MaschinenVO",
obligation_text="Sicherheitsbezogene Teile der Steuerung so auslegen, dass Ausfälle nicht zu gefährlichen Zuständen führen.",
legal_basis_refs=["MaschinenVO (EU) 2023/1230 Anhang III (1.2.1)", "EN ISO 13849-1"],
authority_level=_LM,
family="machine_safety",
applies_if={"all": [_IS_MACHINE, ("has_safety_function", "eq", True), _EU]},
required_capabilities=["functional_safety"],
required_evidence=["test_report", "policy"],
lifecycle_phase=["development", "placing_on_market"],
proposed=True,
),
ObligationRule(
obligation_id="machine_protection_against_corruption",
title="Schutz gegen Korrumpierung sicherheitsrelevanter Funktionen",
source_regulation="MaschinenVO",
obligation_text="Sicherstellen, dass eine (auch beabsichtigte) Korrumpierung der Software/Verbindung keine gefährliche Situation auslöst.",
legal_basis_refs=["MaschinenVO (EU) 2023/1230 Anhang III (1.1.9)"],
authority_level=_LM,
family="machine_safety",
applies_if={
"all": [
_IS_MACHINE,
("has_safety_function", "eq", True),
{"any": [("has_remote_access", "eq", True), ("has_software", "eq", True)]},
_EU,
]
},
required_capabilities=["software_integrity", "secure_by_default"],
required_evidence=["test_report", "config_export"],
lifecycle_phase=["development", "operation", "maintenance"],
overlap_group_id="VULNERABILITY_HANDLING",
proposed=True,
),
ObligationRule(
obligation_id="machine_instructions_for_use",
title="Betriebsanleitung",
source_regulation="MaschinenVO",
obligation_text="Eine vollständige Betriebsanleitung mit Sicherheitshinweisen bereitstellen.",
legal_basis_refs=["MaschinenVO (EU) 2023/1230 Anhang III (1.7.4)"],
authority_level=_LM,
family="machine_safety",
applies_if={"all": [_IS_MACHINE, _EU]},
required_capabilities=["technical_documentation"],
required_evidence=["policy"],
lifecycle_phase=["placing_on_market"],
overlap_group_id="INSTRUCTIONS_FOR_USE",
proposed=True,
),
ObligationRule(
obligation_id="machine_ce_conformity",
title="Konformitätsbewertung / CE (Maschine)",
source_regulation="MaschinenVO",
obligation_text="Das passende Konformitätsbewertungsverfahren der MaschinenVO durchlaufen und CE kennzeichnen.",
legal_basis_refs=["MaschinenVO (EU) 2023/1230 Art. 25", "Anhang IV"],
authority_level=_LM,
family="machine_safety",
applies_if={"all": [_IS_MACHINE, _EU]},
required_capabilities=["conformity_assessment"],
required_evidence=["test_report", "policy"],
lifecycle_phase=["placing_on_market"],
overlap_group_id="CE_CONFORMITY",
proposed=True,
),
]
DATA_ACT_OBLIGATIONS: List[ObligationRule] = [
ObligationRule(
obligation_id="data_act_data_access_by_design",
title="Datenzugang by design",
source_regulation="DataAct",
obligation_text="Vernetzte Produkte so gestalten, dass die erzeugten Produktdaten standardmäßig zugänglich sind.",
legal_basis_refs=["Data Act (EU) 2023/2854 Art. 3"],
authority_level=_LM,
family="data_act",
applies_if={
"all": [
("generates_usage_data", "eq", True),
{"any": [("has_cloud_connection", "eq", True), ("has_remote_access", "eq", True)]},
_EU,
]
},
required_capabilities=["data_access_provision"],
required_evidence=["config_export", "policy"],
lifecycle_phase=["development", "placing_on_market"],
proposed=True,
),
ObligationRule(
obligation_id="data_act_user_data_access",
title="Datenzugang für Nutzer",
source_regulation="DataAct",
obligation_text="Nutzern Zugang zu den von ihnen erzeugten Daten gewähren und Weitergabe an Dritte ermöglichen.",
legal_basis_refs=["Data Act (EU) 2023/2854 Art. 4", "Art. 5"],
authority_level=_LM,
family="data_act",
applies_if={"all": [("generates_usage_data", "eq", True), _EU]},
required_capabilities=["data_access_provision"],
required_evidence=["policy"],
lifecycle_phase=["operation"],
proposed=True,
),
]
@@ -0,0 +1,91 @@
"""Obligation overlap groups (spec §4.5 / Modus 2).
Overlaps are emitted only for the members that are actually applicable to the
product. `canonical_obligation_id` points at the strongest / most specific
obligation in the group (preferring a registry-anchored CRA id).
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import List
from .enums import OverlapType
@dataclass(frozen=True)
class OverlapGroup:
overlap_group_id: str
members: List[str]
overlap_type: OverlapType
canonical_obligation_id: str
explanation: str
OVERLAP_GROUPS: List[OverlapGroup] = [
OverlapGroup(
overlap_group_id="VULNERABILITY_HANDLING",
members=[
"vuln_handling_process",
"coordinated_vulnerability_disclosure",
"machine_protection_against_corruption",
],
overlap_type=OverlapType.COMPLEMENTARY,
canonical_obligation_id="vuln_handling_process",
explanation=(
"CRA adressiert die Schwachstellenbehandlung des Produkts. Die MaschinenVO wird "
"komplementär relevant, sobald eine Cyber-Schwachstelle eine Sicherheitsfunktion "
"beeinflussen kann (Anhang III 1.1.9). Nicht identisch, aber gemeinsam zu erfüllen."
),
),
OverlapGroup(
overlap_group_id="SECURITY_UPDATES",
members=["provide_security_updates", "signed_update_integrity"],
overlap_type=OverlapType.COMPLEMENTARY,
canonical_obligation_id="provide_security_updates",
explanation=(
"Updates bereitstellen und ihre Integrität sichern sind zwei Seiten desselben "
"Update-Prozesses; ein Nachweis (Update-Policy, Release Notes) deckt teils beide ab."
),
),
OverlapGroup(
overlap_group_id="RISK_ASSESSMENT",
members=["cra_risk_assessment", "machine_risk_assessment"],
overlap_type=OverlapType.DIFFERENT_SCOPE,
canonical_obligation_id="cra_risk_assessment",
explanation=(
"Zwei getrennte Risikobetrachtungen: CRA = Cybersicherheits-Risiko, MaschinenVO = "
"Sicherheits-/Gefährdungsbeurteilung. Methodisch verwandt, inhaltlich unterschiedlich."
),
),
OverlapGroup(
overlap_group_id="TECHNICAL_DOCUMENTATION",
members=["cra_technical_documentation", "machine_risk_assessment"],
overlap_type=OverlapType.SIMILAR,
canonical_obligation_id="cra_technical_documentation",
explanation=(
"Beide Regime verlangen eine technische Dokumentation; Teile (Risikobetrachtung, "
"Konstruktionsunterlagen) lassen sich in einem konsolidierten technischen Dossier führen."
),
),
OverlapGroup(
overlap_group_id="CE_CONFORMITY",
members=["cra_ce_conformity_assessment", "machine_ce_conformity"],
overlap_type=OverlapType.COMPLEMENTARY,
canonical_obligation_id="machine_ce_conformity",
explanation=(
"Ein Produkt kann zwei CE-Regime gleichzeitig erfüllen müssen (MaschinenVO + CRA). "
"Eine gemeinsame CE-Kennzeichnung, aber getrennte Konformitätsbewertungen."
),
),
OverlapGroup(
overlap_group_id="INSTRUCTIONS_FOR_USE",
members=["cra_instructions_for_use", "machine_instructions_for_use"],
overlap_type=OverlapType.SIMILAR,
canonical_obligation_id="machine_instructions_for_use",
explanation=(
"Betriebsanleitung (MaschinenVO) und Sicherheitsinformationen (CRA) überschneiden sich; "
"ein integriertes Anleitungsdokument kann beide Pflichten bedienen."
),
),
]
@@ -0,0 +1,160 @@
"""Regulation-level applicability trigger rules (scope discovery, spec Modus 1).
Each rule is pure data consumed by `scope_engine`. Triggers reference
`ProductProfile` fields through the safe predicate evaluator. `required_facts`
that are unknown turn the verdict *uncertain* and surface `fact_prompts`.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from .enums import Confidence
from .predicates import Condition
# Positive, human-readable label per profile fact (for trigger_facts output).
FIELD_LABELS: Dict[str, str] = {
"has_software": "Produkt enthält Software / digitale Elemente",
"has_embedded_software": "Produkt enthält eingebettete Software",
"has_remote_access": "Produkt besitzt Fernzugriff / Fernwartung",
"has_cloud_connection": "Produkt ist mit einer Cloud verbunden",
"has_radio_module": "Produkt enthält ein Funkmodul",
"has_safety_function": "Produkt erfüllt eine Sicherheitsfunktion",
"generates_usage_data": "Vernetztes Produkt erzeugt nutzbare Produktdaten",
"is_machine": "Produkt ist eine Maschine",
"is_component": "Produkt ist ein (Sicherheits-)Bauteil",
"eu_market": "Produkt wird auf dem EU-Markt bereitgestellt",
"is_essential_or_important_entity": "Unternehmen ist wesentliche/wichtige Einrichtung",
"manufacturer_role": "Wirtschaftsakteur-Rolle (Hersteller/Importeur/Händler)",
}
@dataclass(frozen=True)
class RegulationRule:
regulation_id: str
name: str
trigger: Condition
required_facts: List[str]
fact_prompts: Dict[str, str]
legal_basis_refs: List[str]
summary: str
confidence_when_applicable: Confidence = Confidence.HIGH
exclusion: Optional[Condition] = None
# Status is downgraded to PARTIALLY_APPLICABLE / MEDIUM when the trigger
# fires only via inference rather than a directly stated fact.
inferred: bool = False
excludable_roles: List[str] = field(default_factory=list)
_ECONOMIC_ROLES = ["manufacturer", "importer", "distributor"]
REGULATION_RULES: List[RegulationRule] = [
RegulationRule(
regulation_id="CRA",
name="Cyber Resilience Act (EU) 2024/2847",
trigger={
"all": [
{"any": [("has_software", "eq", True), ("has_embedded_software", "eq", True)]},
("eu_market", "eq", True),
]
},
required_facts=["has_software", "eu_market", "manufacturer_role"],
fact_prompts={
"has_software": "Enthält das Produkt Software / digitale Elemente?",
"eu_market": "Wird das Produkt auf dem EU-Markt bereitgestellt oder in Verkehr gebracht?",
"manufacturer_role": "Welche Rolle nehmen Sie ein (Hersteller / Importeur / Händler)?",
},
legal_basis_refs=["CRA Art. 2(1)", "CRA Art. 3(1)"],
summary="Produkte mit digitalen Elementen, die auf dem EU-Markt bereitgestellt werden.",
confidence_when_applicable=Confidence.HIGH,
excludable_roles=["operator"],
),
RegulationRule(
regulation_id="MaschinenVO",
name="Maschinenverordnung (EU) 2023/1230",
trigger={
"any": [
("is_machine", "eq", True),
{"all": [("is_component", "eq", True), ("has_safety_function", "eq", True)]},
]
},
required_facts=["is_machine", "eu_market"],
fact_prompts={
"is_machine": "Ist das Produkt eine Maschine oder ein Sicherheitsbauteil?",
"has_safety_function": "Erfüllt das Bauteil eine Sicherheitsfunktion?",
},
legal_basis_refs=["MaschinenVO (EU) 2023/1230 Art. 2", "Anhang III"],
summary="Maschinen oder Sicherheitsbauteile, ggf. mit sicherheitsrelevanter Steuerung.",
confidence_when_applicable=Confidence.MEDIUM,
),
RegulationRule(
regulation_id="RED",
name="Radio Equipment Directive 2014/53/EU",
trigger=("has_radio_module", "eq", True),
required_facts=["has_radio_module"],
fact_prompts={
"has_radio_module": "Besitzt das Produkt ein Funkmodul (WLAN, Bluetooth, Mobilfunk)?",
},
legal_basis_refs=["RED 2014/53/EU Art. 1", "Art. 3(3)(d-f)"],
summary="Funkanlagen; Art. 3(3) deckt zusätzlich Cybersecurity-Anforderungen ab.",
confidence_when_applicable=Confidence.HIGH,
),
RegulationRule(
regulation_id="EMV",
name="EMV-Richtlinie 2014/30/EU",
trigger={
"any": [
("has_software", "eq", True),
("has_embedded_software", "eq", True),
("has_radio_module", "eq", True),
]
},
required_facts=[],
fact_prompts={
"is_electrical": "Ist das Produkt ein elektrisches / elektronisches Betriebsmittel?",
},
legal_basis_refs=["EMV-RL 2014/30/EU Art. 2"],
summary="Elektrische/elektronische Betriebsmittel (hier aus den digitalen Elementen abgeleitet).",
confidence_when_applicable=Confidence.MEDIUM,
inferred=True,
),
RegulationRule(
regulation_id="DataAct",
name="Data Act (EU) 2023/2854",
trigger={
"all": [
{"any": [("has_cloud_connection", "eq", True), ("has_remote_access", "eq", True)]},
("generates_usage_data", "eq", True),
]
},
required_facts=["generates_usage_data"],
fact_prompts={
"generates_usage_data": "Erzeugt das vernetzte Produkt nutzbare Produkt-/Nutzungsdaten?",
},
legal_basis_refs=["Data Act (EU) 2023/2854 Art. 2(5)", "Art. 3-5"],
summary="Vernetzte Produkte, die Nutzungsdaten erzeugen und zugänglich machen.",
confidence_when_applicable=Confidence.HIGH,
),
RegulationRule(
regulation_id="NIS2",
name="NIS2-Richtlinie (EU) 2022/2555",
trigger=("is_essential_or_important_entity", "eq", True),
required_facts=["company_size", "sector", "is_essential_or_important_entity"],
fact_prompts={
"company_size": "Unternehmensgröße (Mitarbeiterzahl / Umsatz)?",
"sector": "In welchem Sektor ist das Unternehmen tätig (Anhang I/II)?",
"is_essential_or_important_entity": "Fällt das Unternehmen als wesentliche/wichtige Einrichtung unter NIS2?",
},
legal_basis_refs=["NIS2-RL (EU) 2022/2555 Art. 2", "Art. 3"],
summary="Adressiert die ORGANISATION (Größe/Sektor/Rolle), nicht das Produkt.",
confidence_when_applicable=Confidence.MEDIUM,
),
]
def regulation_rule(regulation_id: str) -> Optional[RegulationRule]:
for rule in REGULATION_RULES:
if rule.regulation_id == regulation_id:
return rule
return None
@@ -0,0 +1,58 @@
"""Shared types for obligation scope rules.
`required_evidence` MUST draw from the framework-AGNOSTIC evidence catalog
owned by the Compliance Execution Graph (memory `project_compliance_graph.md`,
User-Direktive 2026-06-25). Do not invent framework-specific evidence types.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import List, Optional
from .enums import AuthorityLevel, Confidence
from .predicates import Condition
# Framework-agnostic shared evidence catalog (the only allowed tokens).
EVIDENCE_CATALOG = frozenset(
{
"config_export",
"test_report",
"repo_scan",
"sbom",
"policy",
"audit_log",
"pentest",
"ticket",
}
)
@dataclass(frozen=True)
class ObligationRule:
obligation_id: str
title: str
source_regulation: str
obligation_text: str
legal_basis_refs: List[str]
authority_level: AuthorityLevel
family: str
applies_if: Condition
required_capabilities: List[str]
required_evidence: List[str]
base_confidence: Confidence = Confidence.HIGH
applies_unless: Optional[Condition] = None
lifecycle_phase: List[str] = field(default_factory=list)
applies_to_role: List[str] = field(default_factory=lambda: ["manufacturer", "importer"])
overlap_group_id: Optional[str] = None
# True => obligation_id is owned by the Legal-KG registry (re-link, never re-mint).
registry_anchor: bool = False
# True => Machine/Data-Act obligation the registry has not canonicalised yet.
proposed: bool = False
def __post_init__(self) -> None:
bad = [e for e in self.required_evidence if e not in EVIDENCE_CATALOG]
if bad:
raise ValueError(
"obligation %s uses non-catalog evidence %r" % (self.obligation_id, bad)
)
@@ -0,0 +1,226 @@
"""Pydantic domain objects for the Regulatory Reasoning Engine.
Trigger facts that drive scope are tri-state (`Optional[bool] = None`): `None`
means "fact unknown" and produces an *uncertain* verdict plus a concrete
missing-fact prompt — never silent false security (spec §6.3).
"""
from __future__ import annotations
from datetime import date
from typing import Dict, List, Optional
from pydantic import BaseModel, Field
from .enums import (
ApplicabilityStatus,
AuthorityLevel,
ClaimCoverage,
Confidence,
InterpretationVerdict,
ManufacturerRole,
MarketModel,
OverlapType,
ProductLifecyclePhase,
)
# ---------------------------------------------------------------------------
# Input
# ---------------------------------------------------------------------------
class ProductProfile(BaseModel):
"""The customer's product / system. Tri-state booleans => unknown facts."""
product_name: str
product_profile_id: Optional[str] = None
manufacturer_role: Optional[ManufacturerRole] = None
product_type: List[str] = Field(default_factory=list)
has_software: Optional[bool] = None
has_embedded_software: Optional[bool] = None
has_remote_access: Optional[bool] = None
has_cloud_connection: Optional[bool] = None
has_ai_functionality: Optional[bool] = None
has_radio_module: Optional[bool] = None
has_safety_function: Optional[bool] = None
generates_usage_data: Optional[bool] = None
is_machine: Optional[bool] = None
is_component: Optional[bool] = None
is_spare_part: Optional[bool] = None
placed_on_market_after: Optional[date] = None
intended_use: Optional[str] = None
eu_market: Optional[bool] = None
b2b_or_b2c: Optional[MarketModel] = None
lifecycle_phase: Optional[ProductLifecyclePhase] = None
# Organisation context — only needed for NIS2 (not a product fact).
company_size: Optional[str] = None
sector: Optional[str] = None
is_essential_or_important_entity: Optional[bool] = None
# ---------------------------------------------------------------------------
# Scope
# ---------------------------------------------------------------------------
class ApplicableRegulation(BaseModel):
regulation_id: str
name: str
applicability_status: ApplicabilityStatus
trigger_facts: List[str] = Field(default_factory=list)
legal_basis_refs: List[str] = Field(default_factory=list)
confidence: Confidence
explanation: str
class ExcludedRegulation(BaseModel):
regulation_id: str
name: str
reason: str
class UncertainRegulation(BaseModel):
regulation_id: str
name: str
missing_facts: List[str] = Field(default_factory=list)
explanation: str
class RegulatoryScope(BaseModel):
product_profile_id: Optional[str] = None
applicable_regulations: List[ApplicableRegulation] = Field(default_factory=list)
excluded_regulations: List[ExcludedRegulation] = Field(default_factory=list)
uncertain_regulations: List[UncertainRegulation] = Field(default_factory=list)
missing_facts: List[str] = Field(default_factory=list)
confidence: Confidence = Confidence.MEDIUM
reasoning_summary: str = ""
# ---------------------------------------------------------------------------
# Obligations
# ---------------------------------------------------------------------------
class ApplicableObligation(BaseModel):
obligation_id: str
title: str
source_regulation: str
legal_basis_refs: List[str] = Field(default_factory=list)
obligation_text: str
authority_level: AuthorityLevel
applies_because: List[str] = Field(default_factory=list)
applies_to_role: List[str] = Field(default_factory=list)
lifecycle_phase: List[str] = Field(default_factory=list)
overlap_group_id: Optional[str] = None
required_evidence: List[str] = Field(default_factory=list)
confidence: Confidence
# True only when obligation_id is owned by the Legal-KG registry (CRA P1).
registry_anchor: bool = False
# Machine/Data-Act obligations the registry has not canonicalised yet.
proposed: bool = False
class ObligationOverlap(BaseModel):
overlap_group_id: str
obligations: List[str] = Field(default_factory=list)
overlap_type: OverlapType
canonical_obligation_id: str
explanation: str
# ---------------------------------------------------------------------------
# Customer claims & assessments
# ---------------------------------------------------------------------------
class CustomerImplementationClaim(BaseModel):
claim_id: str
raw_statement: str
normalized_claim: str = ""
claimed_capability: List[str] = Field(default_factory=list)
related_topics: List[str] = Field(default_factory=list)
qualifiers: List[str] = Field(default_factory=list)
evidence_refs: List[str] = Field(default_factory=list)
class ClaimObligationMapping(BaseModel):
"""One row of Welt-1 reasoning: how a customer claim relates to an obligation.
Layers (spec / architect): claim -> interpretation (on the claim object) ->
*potential* obligation coverage (`claim_coverage`) -> evidence required.
Carries NO compliance verdict.
"""
claim_id: str
obligation_id: str
claim_coverage: ClaimCoverage
missing_elements: List[str] = Field(default_factory=list)
required_evidence: List[str] = Field(default_factory=list)
explanation: str
confidence: Confidence
class InterpretationAssessment(BaseModel):
interpretation_id: str
raw_interpretation: str
affected_regulations: List[str] = Field(default_factory=list)
affected_obligations: List[str] = Field(default_factory=list)
assessment: InterpretationVerdict
risks: List[str] = Field(default_factory=list)
corrected_interpretation: str = ""
legal_basis_refs: List[str] = Field(default_factory=list)
explanation: str
confidence: Confidence
# ---------------------------------------------------------------------------
# API request / response envelopes
# ---------------------------------------------------------------------------
class ScopeRequest(BaseModel):
product_profile: ProductProfile
class ScopeResponse(BaseModel):
regulatory_scope: RegulatoryScope
missing_facts: List[str] = Field(default_factory=list)
confidence: Confidence
class ObligationsRequest(BaseModel):
product_profile: ProductProfile
regulatory_scope: Optional[RegulatoryScope] = None
class ObligationsResponse(BaseModel):
applicable_obligations: List[ApplicableObligation] = Field(default_factory=list)
overlaps: List[ObligationOverlap] = Field(default_factory=list)
excluded_obligations: List[str] = Field(default_factory=list)
evidence_for_multiple: Dict[str, List[str]] = Field(default_factory=dict)
class ImplementationReasoningRequest(BaseModel):
product_profile: ProductProfile
customer_claim: str
class ImplementationReasoningResponse(BaseModel):
claim: CustomerImplementationClaim
mappings: List[ClaimObligationMapping] = Field(default_factory=list)
missing_evidence: List[str] = Field(default_factory=list)
summary: str = ""
# Makes the Welt-1 boundary explicit: this is advisory claim-mapping, not a
# conformity verdict (that is ComplianceStatus in the Execution Graph).
disclaimer: str = ""
class InterpretationRequest(BaseModel):
product_profile: Optional[ProductProfile] = None
customer_interpretation: str
class InterpretationResponse(BaseModel):
assessment: InterpretationVerdict
affected_regulations: List[str] = Field(default_factory=list)
affected_obligations: List[str] = Field(default_factory=list)
corrected_interpretation: str = ""
risks: List[str] = Field(default_factory=list)
legal_basis_refs: List[str] = Field(default_factory=list)
explanation: str = ""
confidence: Confidence = Confidence.MEDIUM
@@ -0,0 +1,136 @@
"""Scope discovery engine (spec Modus 1).
Answers "which regulations apply to my product?" — and, crucially, never says
"X applies" without the triggers, and never hides a missing fact behind a false
verdict. Pure rule evaluation, deterministic.
"""
from __future__ import annotations
from typing import List, Optional
from .enums import ApplicabilityStatus, Confidence
from .predicates import Condition, evaluate, true_leaves, unknown_fields
from .rules_regulations import REGULATION_RULES, FIELD_LABELS, RegulationRule
from .schemas import (
ApplicableRegulation,
ExcludedRegulation,
ProductProfile,
RegulatoryScope,
UncertainRegulation,
)
_DOWNGRADE = {Confidence.HIGH: Confidence.MEDIUM, Confidence.MEDIUM: Confidence.LOW, Confidence.LOW: Confidence.LOW}
def _fields_in(condition: Optional[Condition]) -> List[str]:
if condition is None:
return []
if isinstance(condition, tuple):
return [condition[0]]
out: List[str] = []
for c in condition.get("all") or condition.get("any") or []:
out.extend(_fields_in(c))
return out
def _trigger_facts(rule: RegulationRule, profile: ProductProfile) -> List[str]:
labels: List[str] = []
for leaf in true_leaves(rule.trigger, profile):
label = FIELD_LABELS.get(leaf[0])
if label and label not in labels:
labels.append(label)
return labels
def _missing_prompts(rule: RegulationRule, profile: ProductProfile) -> List[str]:
fields = list(dict.fromkeys(rule.required_facts + _fields_in(rule.trigger)))
unknown = unknown_fields(fields, profile)
prompts: List[str] = []
for f in unknown:
prompt = rule.fact_prompts.get(f)
if prompt and prompt not in prompts:
prompts.append(prompt)
return prompts
def discover_scope(profile: ProductProfile) -> RegulatoryScope:
scope = RegulatoryScope(product_profile_id=profile.product_profile_id)
for rule in REGULATION_RULES:
role_value = profile.manufacturer_role.value if profile.manufacturer_role is not None else None
role_excluded = role_value is not None and role_value in rule.excludable_roles
trig = evaluate(rule.trigger, profile)
missing = _missing_prompts(rule, profile)
if role_excluded:
scope.excluded_regulations.append(
ExcludedRegulation(
regulation_id=rule.regulation_id,
name=rule.name,
reason="Rolle '%s' ist von dieser Regulierung nicht unmittelbar adressiert." % role_value,
)
)
continue
if trig is True:
conf = Confidence.MEDIUM if rule.inferred else rule.confidence_when_applicable
status = (
ApplicabilityStatus.PARTIALLY_APPLICABLE if rule.inferred else ApplicabilityStatus.APPLICABLE
)
unresolved = unknown_fields(rule.required_facts, profile)
if unresolved:
conf = _DOWNGRADE[conf]
for f in unresolved:
prompt = rule.fact_prompts.get(f)
if prompt and prompt not in scope.missing_facts:
scope.missing_facts.append(prompt)
scope.applicable_regulations.append(
ApplicableRegulation(
regulation_id=rule.regulation_id,
name=rule.name,
applicability_status=status,
trigger_facts=_trigger_facts(rule, profile),
legal_basis_refs=rule.legal_basis_refs,
confidence=conf,
explanation=rule.summary,
)
)
elif trig is None:
scope.uncertain_regulations.append(
UncertainRegulation(
regulation_id=rule.regulation_id,
name=rule.name,
missing_facts=missing,
explanation=rule.summary,
)
)
for m in missing:
if m not in scope.missing_facts:
scope.missing_facts.append(m)
else: # trig is False -> definitively excluded by a known fact
scope.excluded_regulations.append(
ExcludedRegulation(
regulation_id=rule.regulation_id,
name=rule.name,
reason="Auslösende Voraussetzungen sind anhand der bekannten Fakten nicht erfüllt.",
)
)
scope.confidence = _overall_confidence(scope)
scope.reasoning_summary = _summary(scope)
return scope
def _overall_confidence(scope: RegulatoryScope) -> Confidence:
if scope.applicable_regulations and not scope.uncertain_regulations and not scope.missing_facts:
return Confidence.HIGH
if scope.applicable_regulations:
return Confidence.MEDIUM
return Confidence.LOW
def _summary(scope: RegulatoryScope) -> str:
applicable = ", ".join(r.regulation_id for r in scope.applicable_regulations) or ""
uncertain = ", ".join(r.regulation_id for r in scope.uncertain_regulations) or ""
return "Wahrscheinlich anwendbar: %s. Unsicher (fehlende Fakten): %s." % (applicable, uncertain)
@@ -0,0 +1,104 @@
"""Deterministic taxonomy for normalising free-text customer claims.
Capability names echo the planned Obligation -> Capability layer of the
Compliance Execution Graph (memory `project_compliance_graph.md`), so the
reasoning layer's claim capabilities line up with the registry's capabilities.
Matching is lowercase substring matching — deterministic, no LLM, no RAG.
"""
from __future__ import annotations
from typing import Dict, List
# capability -> trigger substrings (German + English), matched lowercase.
CAPABILITY_KEYWORDS: Dict[str, List[str]] = {
"software_bill_of_materials": [
"sbom", "stückliste", "stueckliste", "bill of materials", "komponentenliste",
],
"secure_updates": ["update", "patch", "aktualisier", "release", "rollout"],
"software_integrity": ["signier", "signatur", "signed", "integrität", "integritaet", "hash"],
"vulnerability_management": [
"schwachstelle", "vulnerab", "cve", "schwachstellenmanagement", "vuln",
],
"coordinated_disclosure": [
"disclosure", "offenlegung", "security.txt", "responsible disclosure",
],
"incident_reporting": [
"incident", "vorfall", "behörde", "behoerde", "csirt", "meldepflicht", "an die behörde",
],
"authentication": [
"authentifizier", "login", "passwort", "password", "mfa", "2fa", "anmeldung",
],
"secure_by_default": [
"härtung", "haertung", "hardening", "default", "standardkonfig",
"sichere konfiguration", "angriffsfläche", "angriffsflaeche",
],
"security_logging": ["logging", "log ", "logs", "protokoll", "audit-trail", "ereignisprotokoll"],
"secure_communication": ["verschlüssel", "verschluessel", "encryption", "tls", "vpn", "ssl"],
"risk_assessment": [
"risikoanalyse", "risikobeurteil", "risk assessment", "gefährdungsbeurteil",
"gefaehrdungsbeurteil", "bedrohungsanalyse", "threat model",
],
"technical_documentation": [
"dokumentation", "technische unterlagen", "betriebsanleitung", "handbuch", "documentation",
],
"conformity_assessment": ["konformität", "konformitaet", "conformity", "baumuster", "ce-kenn"],
"functional_safety": [
"performance level", "sil ", "iso 13849", "funktionale sicherheit", "safety control",
],
"data_access_provision": [
"datenzugang", "data access", "datenportabilität", "datenexport", "data export",
],
}
# capability -> broader compliance topics it touches (spec related_topics).
CAPABILITY_TOPICS: Dict[str, List[str]] = {
"software_bill_of_materials": ["component_transparency", "supply_chain", "vulnerability_management"],
"secure_updates": ["secure_updates", "vulnerability_remediation", "release_management"],
"software_integrity": ["secure_updates", "supply_chain", "tamper_protection"],
"vulnerability_management": ["vulnerability_handling", "monitoring", "patch_management"],
"coordinated_disclosure": ["vulnerability_handling", "transparency"],
"incident_reporting": ["incident_handling", "authority_notification"],
"authentication": ["access_control", "identity"],
"secure_by_default": ["hardening", "attack_surface", "configuration"],
"security_logging": ["monitoring", "forensics", "incident_handling"],
"secure_communication": ["confidentiality", "integrity", "remote_access"],
"risk_assessment": ["risk_management", "secure_by_design"],
"technical_documentation": ["documentation", "conformity"],
"conformity_assessment": ["conformity", "ce_marking"],
"functional_safety": ["machine_safety", "control_systems"],
"data_access_provision": ["data_sharing", "portability"],
}
# qualifier -> substrings that signal a weak/incomplete implementation.
QUALIFIER_KEYWORDS: Dict[str, List[str]] = {
"reactive": [
"wenn kunden", "wenn ein kunde", "nach meldung", "auf anfrage", "auf nachfrage",
"nur wenn", "reaktiv", "wenn fehler", "when customers", "on request", "when reported",
"ad hoc", "ad-hoc", "bei bedarf",
],
"manual": ["manuell", "von hand", "manual", "händisch", "haendisch"],
"planned": [
"geplant", "in planung", "wollen wir", "planen wir", "noch nicht", "zukünftig", "künftig",
],
"absent": ["haben wir nicht", "gibt es nicht", "nicht vorhanden", "keinen prozess", "keine"],
}
def match_capabilities(text: str) -> List[str]:
low = text.lower()
return [cap for cap, kws in CAPABILITY_KEYWORDS.items() if any(k in low for k in kws)]
def match_qualifiers(text: str) -> List[str]:
low = text.lower()
return [q for q, kws in QUALIFIER_KEYWORDS.items() if any(k in low for k in kws)]
def topics_for(capabilities: List[str]) -> List[str]:
out: List[str] = []
for cap in capabilities:
for t in CAPABILITY_TOPICS.get(cap, []):
if t not in out:
out.append(t)
return out
@@ -0,0 +1,159 @@
"""Known customer interpretation patterns (spec Modus 4).
Deterministic: a customer interpretation is matched by lowercase substring
triggers against a curated library of common misconceptions. No match ->
the engine returns `uncertain` and asks for the missing context (no false
security, spec §6.3).
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import List
from .enums import Confidence, InterpretationVerdict
@dataclass(frozen=True)
class InterpretationPattern:
pattern_id: str
triggers: List[str]
verdict: InterpretationVerdict
corrected_interpretation: str
explanation: str
affected_regulations: List[str] = field(default_factory=list)
affected_obligations: List[str] = field(default_factory=list)
risks: List[str] = field(default_factory=list)
legal_basis_refs: List[str] = field(default_factory=list)
confidence: Confidence = Confidence.MEDIUM
INTERPRETATION_PATTERNS: List[InterpretationPattern] = [
InterpretationPattern(
pattern_id="cra_only_new_products",
triggers=[
"nur für neue", "nur fuer neue", "nur neu entwickelt", "nur neuentwicklung",
"nur bei neuentwicklung", "only new product", "gilt nur für neue produkte",
],
verdict=InterpretationVerdict.TOO_NARROW,
corrected_interpretation=(
"CRA-Pflichten knüpfen primär an Produkt, Rolle, Marktzugang, Bereitstellung und "
"Übergangsfristen an, nicht nur an Neuentwicklung. Ein fertig entwickeltes "
"Katalogprodukt kann betroffen sein, wenn es nach dem maßgeblichen Zeitpunkt weiter "
"auf dem EU-Markt bereitgestellt wird."
),
explanation=(
"Die relevante Frage ist nicht nur, ob das Produkt neu entwickelt wurde, sondern ob es "
"nach dem Anwendungszeitpunkt weiterhin bereitgestellt oder in Verkehr gebracht wird."
),
affected_regulations=["CRA"],
risks=["Katalog-/Bestandsprodukt fällt trotz abgeschlossener Entwicklung unter den CRA."],
legal_basis_refs=["CRA Art. 2", "CRA Art. 69 (Übergangsbestimmungen)"],
confidence=Confidence.HIGH,
),
InterpretationPattern(
pattern_id="cra_b2b_exempt",
triggers=[
"gilt nicht für b2b", "nur für verbraucher", "nur b2c", "nicht im b2b",
"only consumer", "b2b ist ausgenommen",
],
verdict=InterpretationVerdict.TOO_NARROW,
corrected_interpretation=(
"Der CRA gilt produkt- und marktbezogen, unabhängig von B2B oder B2C. Eine generelle "
"B2B-Ausnahme existiert nicht; Industrieprodukte mit digitalen Elementen sind erfasst."
),
explanation="Der Anwendungsbereich knüpft an 'Produkte mit digitalen Elementen' an, nicht an die Kundengruppe.",
affected_regulations=["CRA"],
risks=["Industrielle B2B-Steuerungen werden fälschlich als ausgenommen behandelt."],
legal_basis_refs=["CRA Art. 2", "CRA Art. 3(1)"],
confidence=Confidence.HIGH,
),
InterpretationPattern(
pattern_id="sbom_is_enough",
triggers=[
"sbom reicht", "mit sbom sind wir", "sbom genügt", "sbom genuegt", "nur eine sbom",
"sbom allein",
],
verdict=InterpretationVerdict.TOO_NARROW,
corrected_interpretation=(
"Eine SBOM erfüllt nur einen Teil der Komponenten-Transparenz. Schwachstellen-"
"überwachung, Update-/Patch-Prozess und technische Dokumentation bleiben eigenständige Pflichten."
),
explanation="SBOM ist Voraussetzung, ersetzt aber nicht Vulnerability-Handling und Updates.",
affected_regulations=["CRA"],
affected_obligations=["sbom_creation", "vuln_handling_process", "provide_security_updates"],
risks=["Falsche Annahme vollständiger Erfüllung trotz fehlendem Vulnerability-Prozess."],
legal_basis_refs=["CRA Annex I Part II (1)", "CRA Annex I Part II (2)"],
confidence=Confidence.HIGH,
),
InterpretationPattern(
pattern_id="open_source_exempt",
triggers=[
"open source ist ausgenommen", "open-source ist ausgenommen", "oss ist ausgenommen",
"freie software ist ausgenommen", "open source fällt nicht",
],
verdict=InterpretationVerdict.PARTIALLY_CORRECT,
corrected_interpretation=(
"Nur nicht-kommerziell bereitgestellte Open-Source-Software ist ausgenommen. Sobald OSS "
"kommerziell in ein Produkt integriert und auf dem Markt bereitgestellt wird, greift der CRA."
),
explanation="Die Ausnahme zielt auf nicht-kommerzielle OSS-Bereitstellung, nicht auf kommerzielle Produktintegration.",
affected_regulations=["CRA"],
risks=["Kommerziell integrierte OSS-Komponenten werden fälschlich als ausgenommen behandelt."],
legal_basis_refs=["CRA Art. 2", "CRA Erwägungsgründe (Open-Source-Stewards)"],
confidence=Confidence.MEDIUM,
),
InterpretationPattern(
pattern_id="reactive_updates_ok",
triggers=[
"updates nur wenn", "reaktive updates reichen", "wenn kunden melden reicht",
"updates wenn fehler gemeldet",
],
verdict=InterpretationVerdict.TOO_NARROW,
corrected_interpretation=(
"Der CRA verlangt aktive Schwachstellenüberwachung und zeitnahe Sicherheitsupdates über "
"den Supportzeitraum, nicht nur reaktive Updates nach Kundenmeldung."
),
explanation="Ein rein reaktiver Updateprozess erfüllt die Pflicht zur aktiven Schwachstellenbehandlung nicht.",
affected_regulations=["CRA"],
affected_obligations=["provide_security_updates", "vuln_handling_process"],
risks=["Verzögerte Reaktion auf öffentlich bekannte Schwachstellen; Pflichtverletzung."],
legal_basis_refs=["CRA Annex I Part II (1)", "CRA Annex I (2)(c)"],
confidence=Confidence.HIGH,
),
InterpretationPattern(
pattern_id="machinery_covers_cyber",
triggers=[
"maschinenrichtlinie deckt cyber", "maschinenvo deckt alles", "ce der maschine reicht",
"ce maschine reicht für cyber", "maschinen-ce reicht",
],
verdict=InterpretationVerdict.PARTIALLY_CORRECT,
corrected_interpretation=(
"Die MaschinenVO deckt die sicherheitsrelevante Korrumpierung ab (Anhang III 1.1.9), "
"ersetzt aber nicht die produktbezogenen CRA-Security-Pflichten. Beide Regime gelten parallel."
),
explanation="Maschinen-CE und CRA überschneiden sich nur dort, wo Cyber eine Sicherheitsfunktion betrifft.",
affected_regulations=["CRA", "MaschinenVO"],
affected_obligations=["machine_protection_against_corruption", "vuln_handling_process"],
risks=["CRA-Pflichten werden übersehen, weil die Maschine bereits CE-gekennzeichnet ist."],
legal_basis_refs=["MaschinenVO Anhang III (1.1.9)", "CRA Art. 13"],
confidence=Confidence.MEDIUM,
),
InterpretationPattern(
pattern_id="no_radio_no_cyber",
triggers=[
"ohne funkmodul kein cyber", "kein funk also kein cra", "ohne funk keine security",
"ohne funkmodul keine cyber",
],
verdict=InterpretationVerdict.TOO_NARROW,
corrected_interpretation=(
"Der CRA knüpft an digitale Elemente an, nicht an ein Funkmodul. Ohne Funk entfällt die "
"RED, der CRA bleibt jedoch anwendbar, sobald Software vorhanden ist."
),
explanation="Funkmodul ist nur für die RED relevant; die CRA-Anwendbarkeit folgt aus der Software.",
affected_regulations=["CRA", "RED"],
risks=["CRA wird fälschlich verneint, weil kein Funkmodul vorhanden ist."],
legal_basis_refs=["CRA Art. 3(1)", "RED 2014/53/EU Art. 1"],
confidence=Confidence.HIGH,
),
]
@@ -0,0 +1,31 @@
"""Regulatory Map — customer-readable read-model over the engine's scope output.
Composes scope + registry-linked obligations + overlaps into one map:
product -> trigger facts -> applicable / uncertain / excluded regulations ->
obligations -> overlaps -> unsupported domains -> executive summary. Explains the
engine's state, never extends it. No new logic, no UI, no RAG, no percentage.
"""
from __future__ import annotations
from .renderer import render_regulatory_map
from .schemas import (
ApplicableRegulationView,
ExcludedRegulationView,
ObligationRef,
OverlapView,
RegulatoryMap,
RegulatoryMapRequest,
UncertainRegulationView,
)
__all__ = [
"render_regulatory_map",
"RegulatoryMap",
"RegulatoryMapRequest",
"ApplicableRegulationView",
"UncertainRegulationView",
"ExcludedRegulationView",
"OverlapView",
"ObligationRef",
]
@@ -0,0 +1,169 @@
"""Regulatory Map renderer (step 4) — pure composition, no new logic.
It explains the engine's state, it does not extend it: every statement comes
from `resolve_product_scope` (scope verdict) or `derive_obligations` (registry-
linked obligations + overlaps). No legal decisions here; obligations are shown
ONLY where a registry id is linkable (registry_anchor); the executive summary
carries counts but NO percentage.
"""
from __future__ import annotations
from typing import Dict, List
from compliance.navigator.engine import navigate
from compliance.product_scope.orchestrator import resolve_product_scope
from compliance.product_scope.schemas import RegulatoryScopeResult, ScopeStatus
from compliance.profile.canonical import CanonicalProductRegulatoryProfile
from compliance.profile.to_reasoning import to_reasoning_profile
from compliance.reasoning.obligation_engine import derive_obligations
from .schemas import (
ApplicableRegulationView,
ExcludedRegulationView,
ObligationRef,
OverlapView,
RegulatoryMap,
UncertainRegulationView,
)
_DOMAIN_BY_REG = {
"CRA": "cyber",
"MaschinenVO": "machine_safety",
"RED": "radio",
"DataAct": "data",
"EMV": "emv",
"NIS2": None,
}
def _product_summary(c: CanonicalProductRegulatoryProfile) -> str:
bits: List[str] = [c.name or "Produkt"]
if c.product_type:
bits.append("(%s)" % c.product_type.value)
sig: List[str] = []
if c.is_machine:
sig.append("Maschine")
if c.has_remote_access or c.connected_to_internet or "cloud" in c.technologies:
sig.append("vernetzt")
if c.has_embedded_software:
sig.append("Firmware")
if c.economic_operator_role:
sig.append("Rolle: %s" % c.economic_operator_role.value)
if c.markets:
sig.append("Märkte: %s" % ", ".join(c.markets))
if sig:
bits.append("" + "; ".join(sig))
return " ".join(bits)
def render_regulatory_map(profile: CanonicalProductRegulatoryProfile) -> RegulatoryMap:
scope_resp = resolve_product_scope(profile)
summary = _product_summary(profile)
if scope_resp.status == ScopeStatus.NEEDS_FACTS:
return RegulatoryMap(
scope_resolved=False,
product_summary=summary,
executive_summary=(
"Regulatorischer Scope noch nicht bestimmbar — zuerst Mindestfakten klären: "
+ "; ".join(scope_resp.missing_facts[:6])
+ "."
),
)
scope = scope_resp.regulatory_scope
assert scope is not None
obligations = derive_obligations(to_reasoning_profile(profile))
nav_questions = navigate(profile).suggested_questions
linked_ids = {o.obligation_id for o in obligations.applicable_obligations if o.registry_anchor}
by_reg: Dict[str, List[ObligationRef]] = {}
shared_ev: Dict[str, List[str]] = {}
for o in obligations.applicable_obligations:
if not o.registry_anchor:
continue
by_reg.setdefault(o.source_regulation, []).append(
ObligationRef(
obligation_id=o.obligation_id,
title=o.title,
legal_basis_refs=o.legal_basis_refs,
authority_level=o.authority_level,
)
)
for ev in o.required_evidence:
shared_ev.setdefault(ev, []).append(o.obligation_id)
applicable_views = []
for r in scope.applicable_regulations:
obs = by_reg.get(r.regulation_id, [])
applicable_views.append(
ApplicableRegulationView(
regulation_id=r.regulation_id,
name=r.name,
why_applicable=r.explanation,
triggered_by=r.trigger_facts,
obligations=obs,
obligations_note="" if obs else "Pflichten für dieses Regelwerk sind noch nicht registry-verlinkt.",
confidence=r.confidence,
)
)
uncertain_views = []
for u in scope.uncertain_regulations:
domain = _DOMAIN_BY_REG.get(u.regulation_id)
qrefs = [q.question_id for q in nav_questions if domain and domain in q.regulatory_domains_unblocked]
uncertain_views.append(
UncertainRegulationView(
regulation_id=u.regulation_id, name=u.name, missing_facts=u.missing_facts, question_refs=qrefs
)
)
overlap_views = []
for ov in obligations.overlaps:
members = [m for m in ov.obligations if m in linked_ids]
if len(members) >= 2:
overlap_views.append(
OverlapView(overlap_group_id=ov.overlap_group_id, shared_obligations=members, explanation=ov.explanation)
)
trigger_facts: List[str] = []
for v in applicable_views:
for t in v.triggered_by:
if t not in trigger_facts:
trigger_facts.append(t)
return RegulatoryMap(
scope_resolved=True,
product_summary=summary,
trigger_facts=trigger_facts,
applicable_regulations=applicable_views,
uncertain_regulations=uncertain_views,
excluded_regulations=[
ExcludedRegulationView(regulation_id=e.regulation_id, name=e.name, exclusion_reason=e.reason)
for e in scope.excluded_regulations
],
unsupported_domains=scope.unsupported_domains,
overlaps=overlap_views,
shared_evidence={ev: ids for ev, ids in shared_ev.items() if len(ids) > 1},
executive_summary=_executive_summary(summary, applicable_views, uncertain_views, scope, len(linked_ids)),
)
def _executive_summary(
summary: str,
applicable: List[ApplicableRegulationView],
uncertain: List[UncertainRegulationView],
scope: RegulatoryScopeResult,
n_obligations: int,
) -> str:
appl = ", ".join(v.regulation_id for v in applicable) or ""
unc = ", ".join(v.regulation_id for v in uncertain) or "keine"
exc = ", ".join(e.regulation_id for e in scope.excluded_regulations) or "keine"
uns = ", ".join(d.domain for d in scope.unsupported_domains) or "keine"
return (
"Für %s gelten nach derzeitigem Stand wahrscheinlich: %s. Unsicher (fehlende Fakten): %s. "
"Ausgeschlossen: %s. Nicht abgedeckt (Regelkorpus fehlt): %s. Ermittelt: %d registry-verlinkte "
"Pflichten. Es wurden keine weiteren Regelwerke im aktuellen Korpus identifiziert."
% (summary, appl, unc, exc, uns, n_obligations)
)
@@ -0,0 +1,70 @@
"""Read-model for the Regulatory Map (step 4).
A customer-readable view that COMPOSES what the engine already computed (scope +
obligations + overlaps). It adds no scope/obligation logic. All fields are
application-level presentation types — NOT compliance-meta-model classes
(architecture freeze v1.0 untouched).
"""
from __future__ import annotations
from typing import Dict, List
from pydantic import BaseModel, Field
from compliance.product_scope.schemas import UnsupportedDomain
from compliance.profile.canonical import CanonicalProductRegulatoryProfile
from compliance.reasoning.enums import AuthorityLevel, Confidence
class RegulatoryMapRequest(BaseModel):
product_profile: CanonicalProductRegulatoryProfile
class ObligationRef(BaseModel):
obligation_id: str
title: str
legal_basis_refs: List[str] = Field(default_factory=list)
authority_level: AuthorityLevel
class ApplicableRegulationView(BaseModel):
regulation_id: str
name: str
why_applicable: str
triggered_by: List[str] = Field(default_factory=list)
obligations: List[ObligationRef] = Field(default_factory=list)
obligations_note: str = "" # set when obligations are not yet registry-linkable
confidence: Confidence
class UncertainRegulationView(BaseModel):
regulation_id: str
name: str
missing_facts: List[str] = Field(default_factory=list)
question_refs: List[str] = Field(default_factory=list)
class ExcludedRegulationView(BaseModel):
regulation_id: str
name: str
exclusion_reason: str
class OverlapView(BaseModel):
overlap_group_id: str
shared_obligations: List[str] = Field(default_factory=list)
explanation: str = ""
class RegulatoryMap(BaseModel):
scope_resolved: bool
product_summary: str
trigger_facts: List[str] = Field(default_factory=list)
applicable_regulations: List[ApplicableRegulationView] = Field(default_factory=list)
uncertain_regulations: List[UncertainRegulationView] = Field(default_factory=list)
excluded_regulations: List[ExcludedRegulationView] = Field(default_factory=list)
unsupported_domains: List[UnsupportedDomain] = Field(default_factory=list)
overlaps: List[OverlapView] = Field(default_factory=list)
shared_evidence: Dict[str, List[str]] = Field(default_factory=dict)
executive_summary: str = ""
@@ -0,0 +1,141 @@
"""Tests for Interpretation-in-Map (step 5).
Acceptance: a customer interpretation is judged against the existing map, using
only assess_interpretation; affected regulations/obligations are referenced from
the map; unsupported domains (wastewater/chemicals) are flagged
future_corpus_needed, not pseudo-evaluated; output is customer-readable.
"""
from __future__ import annotations
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from compliance.interpretation_map import interpret_in_map
from compliance.profile.canonical import (
CanonicalLifecyclePhase,
CanonicalProductRegulatoryProfile,
CanonicalProductType,
EconomicOperatorRole,
EnvironmentalImpact,
)
from compliance.reasoning.enums import InterpretationVerdict
from compliance.reasoning.interpretation_engine import assess_interpretation
from compliance.regulatory_map import render_regulatory_map
def ready_profile(**ov) -> CanonicalProductRegulatoryProfile:
base = dict(
name="Industriespülmaschine",
product_type=CanonicalProductType.MACHINERY,
markets=["EU", "DE"],
economic_operator_role=EconomicOperatorRole.MANUFACTURER,
lifecycle_phase=CanonicalLifecyclePhase.PLACING_ON_MARKET,
is_machine=True,
is_component=False,
has_software_updates=True,
has_embedded_software=True,
has_remote_access=True,
technologies=["cloud", "ota_updates"],
)
base.update(ov)
return CanonicalProductRegulatoryProfile(**base)
def _map(**ov):
return render_regulatory_map(ready_profile(**ov))
# 1 + 2. evaluated against the map, using ONLY assess_interpretation.
def test_uses_assess_interpretation_verdict():
text = "Wir glauben, der CRA gilt nur für neue Produkte."
result = interpret_in_map(_map(), text)
assert result.assessment == assess_interpretation(text).assessment == InterpretationVerdict.TOO_NARROW
assert "CRA" in result.affected_regulations # CRA is in the map
assert result.in_scope_of_map is True
# 3. the six verdict values pass through unchanged.
def test_verdict_values():
m = _map()
assert interpret_in_map(m, "CRA gilt nur für neue Produkte.").assessment == InterpretationVerdict.TOO_NARROW
assert interpret_in_map(m, "Open Source ist ausgenommen, also betrifft uns der CRA nicht.").assessment == InterpretationVerdict.PARTIALLY_CORRECT
assert interpret_in_map(m, "Der Mond beeinflusst unsere Updatezyklen.").assessment == InterpretationVerdict.UNCERTAIN
# 4. affected regulations/obligations are referenced FROM the map.
def test_affected_refs_from_map():
m = _map()
result = interpret_in_map(m, "Eine SBOM reicht, dann sind wir fertig.")
map_ob_ids = {o.obligation_id for v in m.applicable_regulations for o in v.obligations}
map_reg_ids = {v.regulation_id for v in m.applicable_regulations} | {v.regulation_id for v in m.uncertain_regulations}
assert "sbom_creation" in result.affected_obligations
assert set(result.affected_obligations) <= map_ob_ids
assert set(result.affected_regulations) <= map_reg_ids
# 5. environmental aspects are NOT pseudo-evaluated.
def test_environmental_not_pseudo_evaluated():
m = _map(environmental=EnvironmentalImpact(discharges_to_wastewater=True))
result = interpret_in_map(m, "Beim Abwasser sind wir nicht betroffen, das spielt für uns keine Rolle.")
domains = {d.domain for d in result.future_corpus_domains}
assert "environment_water" in domains
assert "future_corpus_needed" in result.explanation
# 6. output is customer-readable.
def test_customer_readable():
result = interpret_in_map(_map(), "Der CRA gilt nur für neue Produkte.")
assert "zu eng" in result.explanation
assert result.explanation.startswith("Ihre Interpretation ist wahrscheinlich")
# affected refs never leave the map (no abstract legal questions).
def test_affected_regs_never_outside_map():
m = _map()
map_reg_ids = (
{v.regulation_id for v in m.applicable_regulations}
| {v.regulation_id for v in m.uncertain_regulations}
| {v.regulation_id for v in m.excluded_regulations}
)
for text in ["CRA gilt nur für neue Produkte.", "Ohne Funkmodul keine Cyber-Pflichten.", "SBOM reicht."]:
result = interpret_in_map(m, text)
assert set(result.affected_regulations) <= map_reg_ids
# endpoint smoke.
@pytest.fixture(scope="module")
def client():
from compliance.api.reasoning_routes import router
app = FastAPI()
app.include_router(router)
return TestClient(app)
def test_endpoint_interpretation_in_map(client):
r = client.post(
"/reasoning/interpretation-in-map",
json={
"product_profile": {
"name": "M",
"product_type": "machinery",
"markets": ["EU"],
"economic_operator_role": "manufacturer",
"lifecycle_phase": "placing_on_market",
"is_machine": True,
"is_component": False,
"has_software_updates": True,
"has_embedded_software": True,
"has_remote_access": True,
"technologies": ["cloud"],
},
"customer_interpretation": "Der CRA gilt nur für neue Produkte.",
},
)
assert r.status_code == 200
body = r.json()
assert body["assessment"] == "too_narrow"
assert "CRA" in body["affected_regulations"]
assert "zu eng" in body["explanation"]
+127
View File
@@ -0,0 +1,127 @@
"""Tests for the Product Regulatory Navigator (missing-facts layer).
Acceptance: a well-filled company-profile yields <= 10 questions; known facts are
not re-asked; environmental questions are trigger-only (no law evaluation); the
Navigator decides which facts are missing, NOT what applies.
"""
from __future__ import annotations
from compliance.navigator import NavigatorResult, apply_answers, navigate
from compliance.navigator.questions import QUESTION_CATALOG, QuestionPriority
from compliance.profile import from_company_profile
from compliance.profile.canonical import CanonicalProductRegulatoryProfile, EconomicOperatorRole
COMPANY = {
"industry": "Maschinenbau",
"business_model": "B2B",
"company_size": "medium",
"target_markets": ["DE", "EU"],
"primary_jurisdiction": "DE",
"machine_builder": {
"productTypes": ["special_machine"],
"containsFirmware": True,
"hasSafetyFunction": True,
"isNetworked": True,
"hasRemoteAccess": True,
"hasOTAUpdates": True,
"hasRiskAssessment": True,
},
}
def _empty() -> CanonicalProductRegulatoryProfile:
return CanonicalProductRegulatoryProfile(name="X")
# 1. well-filled company-profile -> at most 10 questions.
def test_filled_company_profile_at_most_10_questions():
result = navigate(from_company_profile(COMPANY))
assert len(result.suggested_questions) <= 10
# 2. known facts (markets, is_machine) are not re-asked; true gaps still are.
def test_known_facts_not_reasked():
result = navigate(from_company_profile(COMPANY))
assert "markets" not in result.missing_facts
assert "is_machine" not in result.missing_facts
# genuine gaps the company-profile cannot provide are still surfaced
assert "economic_operator_role" in result.missing_facts
assert "has_radio_module" in result.missing_facts
# 3. environmental questions are trigger-only — no environmental-law evaluation.
def test_environmental_questions_are_triggers_only():
result = navigate(_empty())
env = [q for q in result.suggested_questions if q.target_field.startswith("environmental.")]
assert len(env) >= 3
assert all(q.answer_type.value == "bool" for q in env)
# 4. the Navigator decides only missing facts, never what applies.
def test_navigator_decides_only_missing_facts():
assert set(NavigatorResult.model_fields.keys()) == {
"missing_facts",
"suggested_questions",
"completeness_summary",
}
# no question carries a verdict — only metadata about what it would unblock
for q in QUESTION_CATALOG:
assert q.regulatory_domains_unblocked # metadata, not a decision
assert hasattr(q, "answer_type")
# 5. apply_answers updates the profile; answered facts drop out of missing.
def test_apply_answers_updates_profile():
profile = from_company_profile(COMPANY)
updated = apply_answers(
profile,
{
"economic_operator_role": "manufacturer",
"markets": ["DE", "US"],
"has_radio_module": True,
"env_wastewater": True,
},
)
assert updated.economic_operator_role == EconomicOperatorRole.MANUFACTURER
assert updated.markets == ["DE", "US"]
assert updated.has_radio_module is True
assert updated.environmental.discharges_to_wastewater is True
after = navigate(updated)
assert "economic_operator_role" not in after.missing_facts
assert "has_radio_module" not in after.missing_facts
assert "environmental.discharges_to_wastewater" not in after.missing_facts
# 6. questions are ordered P0 -> P1 -> P2.
def test_priority_ordering():
questions = navigate(_empty()).suggested_questions
orders = [q.order() for q in questions]
assert orders == sorted(orders)
assert questions[0].priority == QuestionPriority.P0
# 7. ready_for_scope flips once all P0 facts are answered.
def test_ready_for_scope_after_p0():
profile = _empty()
assert navigate(profile).completeness_summary.ready_for_scope is False
answered = apply_answers(
profile,
{
"markets": ["DE"],
"economic_operator_role": "manufacturer",
"lifecycle_phase": "placing_on_market",
"is_machine": True,
"is_component": False,
},
)
summary = navigate(answered).completeness_summary
assert summary.ready_for_scope is True
# 8. empty profile asks the full (bounded) catalog.
def test_empty_profile_bounded_catalog():
result = navigate(_empty())
assert len(result.suggested_questions) == len(QUESTION_CATALOG)
assert result.completeness_summary.total_relevant == len(QUESTION_CATALOG)
@@ -0,0 +1,149 @@
"""Tests for the product-scope orchestrator (step 3).
Acceptance: missing P0 facts -> discover_scope NOT run; ready -> run exactly once;
response separates applicable/excluded/uncertain; environmental triggers appear
only as unsupported_domain (future_corpus_needed), never as a legal evaluation.
"""
from __future__ import annotations
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
import compliance.product_scope.orchestrator as orch
from compliance.product_scope import ScopeStatus, resolve_product_scope
from compliance.profile.canonical import (
CanonicalLifecyclePhase,
CanonicalProductRegulatoryProfile,
CanonicalProductType,
EconomicOperatorRole,
EnvironmentalImpact,
)
_KNOWN_REGS = {"CRA", "MaschinenVO", "RED", "EMV", "DataAct", "NIS2"}
def ready_profile(**ov) -> CanonicalProductRegulatoryProfile:
base = dict(
name="Industriespülmaschine",
product_type=CanonicalProductType.MACHINERY,
markets=["EU", "DE"],
economic_operator_role=EconomicOperatorRole.MANUFACTURER,
lifecycle_phase=CanonicalLifecyclePhase.PLACING_ON_MARKET,
is_machine=True,
is_component=False,
has_software_updates=True,
has_embedded_software=True,
has_remote_access=True,
has_safety_function=True,
technologies=["cloud", "ota_updates"],
)
base.update(ov)
return CanonicalProductRegulatoryProfile(**base)
def _spy(monkeypatch):
calls = {"n": 0}
real = orch.discover_scope
def counting(profile):
calls["n"] += 1
return real(profile)
monkeypatch.setattr(orch, "discover_scope", counting)
return calls
# 1. missing P0 facts -> discover_scope is NOT executed.
def test_needs_facts_does_not_run_scope(monkeypatch):
calls = _spy(monkeypatch)
resp = resolve_product_scope(CanonicalProductRegulatoryProfile(name="X"))
assert resp.status == ScopeStatus.NEEDS_FACTS
assert resp.regulatory_scope is None
assert resp.missing_facts
assert calls["n"] == 0
# 2. ready_for_scope -> discover_scope runs exactly once.
def test_ready_runs_scope_once(monkeypatch):
calls = _spy(monkeypatch)
resp = resolve_product_scope(ready_profile())
assert resp.status == ScopeStatus.RESOLVED
assert resp.regulatory_scope is not None
assert calls["n"] == 1
applicable = {r.regulation_id for r in resp.regulatory_scope.applicable_regulations}
assert "CRA" in applicable and "MaschinenVO" in applicable
# 3. the response separates the regulation categories.
def test_response_separates_categories():
scope = resolve_product_scope(ready_profile()).regulatory_scope
assert scope is not None
# all three buckets exist and only carry known regulation ids
for bucket in (scope.applicable_regulations, scope.excluded_regulations, scope.uncertain_regulations):
for r in bucket:
assert r.regulation_id in _KNOWN_REGS
assert scope.uncertain_regulations # e.g. RED/DataAct/NIS2 with unknown facts
# 4. environmental triggers surface ONLY as unsupported_domain, never as law.
def test_environmental_only_unsupported_domain():
profile = ready_profile(
environmental=EnvironmentalImpact(discharges_to_wastewater=True, uses_cleaning_chemicals=True)
)
scope = resolve_product_scope(profile).regulatory_scope
assert scope is not None
domains = {d.domain for d in scope.unsupported_domains}
assert "environment_water" in domains and "chemicals" in domains
assert all(d.status == "future_corpus_needed" for d in scope.unsupported_domains)
# no environmental "regulation" leaked into the scope verdict
all_regs = (
scope.applicable_regulations + scope.excluded_regulations + scope.uncertain_regulations
)
assert all(r.regulation_id in _KNOWN_REGS for r in all_regs)
# 5. endpoint smoke — both cases.
@pytest.fixture(scope="module")
def client():
from compliance.api.reasoning_routes import router
app = FastAPI()
app.include_router(router)
return TestClient(app)
def test_endpoint_needs_facts(client):
r = client.post("/reasoning/product-scope", json={"product_profile": {"name": "X"}})
assert r.status_code == 200
body = r.json()
assert body["status"] == "needs_facts"
assert body["regulatory_scope"] is None
assert body["missing_facts"]
def test_endpoint_resolved(client):
r = client.post(
"/reasoning/product-scope",
json={
"product_profile": {
"name": "M",
"product_type": "machinery",
"markets": ["EU"],
"economic_operator_role": "manufacturer",
"lifecycle_phase": "placing_on_market",
"is_machine": True,
"is_component": False,
"has_software_updates": True,
"has_embedded_software": True,
"has_remote_access": True,
"technologies": ["cloud"],
}
},
)
assert r.status_code == 200
body = r.json()
assert body["status"] == "resolved"
applicable = {x["regulation_id"] for x in body["regulatory_scope"]["applicable_regulations"]}
assert "CRA" in applicable and "MaschinenVO" in applicable
@@ -0,0 +1,188 @@
"""Tests for the Product Profile convergence layer.
Covers the 10 acceptance criteria of the CanonicalProductRegulatoryProfile spec:
lossless ProductWizard mapping, company-profile prefill, AI stays delegated,
markets no longer hardcoded, and the new Navigator fields (role/radio/usage-data/
lifecycle/BOM) plus one-semantic-profile across reasoning + gap.
"""
from __future__ import annotations
from compliance.profile import (
CanonicalLifecyclePhase,
CanonicalProductRegulatoryProfile,
CanonicalProductType,
ComponentKind,
EconomicOperatorRole,
ProductComponent,
from_company_profile,
from_product_wizard,
to_gap_profile,
to_reasoning_profile,
)
from compliance.reasoning import discover_scope
from compliance.reasoning.enums import ManufacturerRole, ProductLifecyclePhase
# A realistic ProductWizard payload — exactly the gap.ProductProfile JSON shape.
WIZARD = {
"name": "Industriespülmaschine",
"description": "vernetzte Spülmaschine",
"product_type": "machinery",
"technologies": ["cloud", "ota_updates", "sensor", "actuator"],
"data_processing": ["telemetry"],
"markets": ["EU"],
"connected_to_internet": True,
"has_software_updates": True,
"uses_ai": False,
"processes_personal_data": False,
"is_critical_infra_supplier": False,
"existing_certifications": ["CE"],
"applied_norms": ["ISO12100"],
"has_risk_assessment": True,
"has_technical_file": True,
"has_operating_manual": True,
"has_sbom": False,
"has_vuln_management": False,
"has_update_mechanism": True,
"has_incident_response": False,
"has_supply_chain_mgmt": False,
"ce_marking_since": "",
"product_age": "5",
}
COMPANY = {
"company_name": "ACME Maschinen GmbH",
"industry": "Maschinenbau",
"business_model": "B2B",
"company_size": "medium",
"target_markets": ["DE", "EU"],
"primary_jurisdiction": "DE",
"headquarters_country": "DE",
"uses_ai": False,
"is_data_controller": True,
"machine_builder": {
"productDescription": "Industriespülmaschine",
"productTypes": ["special_machine"],
"containsSoftware": True,
"containsFirmware": True,
"containsAI": False,
"hasSafetyFunction": True,
"safetyFunctionDescription": "Türverriegelung",
"isNetworked": True,
"hasRemoteAccess": True,
"hasOTAUpdates": True,
"hasRiskAssessment": True,
"criticalSectorClients": False,
},
}
# 1. ProductWizard data maps losslessly into the canonical and back to gap shape.
def test_product_wizard_lossless_roundtrip():
canonical = from_product_wizard(WIZARD)
assert to_gap_profile(canonical) == WIZARD
# 2. company-profile can prefill the canonical profile.
def test_company_profile_prefill():
c = from_company_profile(COMPANY)
assert c.sector_industry == "Maschinenbau"
assert c.b2b_or_b2c == "B2B"
assert c.company_size == "medium"
assert "DE" in c.markets and "EU" in c.markets
assert c.has_safety_function is True
assert c.has_remote_access is True
assert c.has_embedded_software is True
assert c.is_machine is True
assert c.description == "Industriespülmaschine"
# 3. AI-Act/ucca stays delegated — only uses_ai is forwarded, no risk classification.
def test_ai_classification_stays_delegated():
c = CanonicalProductRegulatoryProfile(name="X", uses_ai=True)
rp = to_reasoning_profile(c)
assert rp.has_ai_functionality is True
assert not hasattr(rp, "ai_risk_category") # no AI classification produced here
# 4. markets are a real list, never hardcoded ['EU'].
def test_markets_not_hardcoded_eu():
assert CanonicalProductRegulatoryProfile(name="X").markets == []
c = from_product_wizard({**WIZARD, "markets": ["US", "JP", "CA"]})
assert c.markets == ["US", "JP", "CA"]
assert to_gap_profile(c)["markets"] == ["US", "JP", "CA"]
assert to_reasoning_profile(c).eu_market is False # non-EU markets -> not EU
# 5. economic-operator role exists and maps to the reasoning role.
def test_economic_operator_role_exists():
c = CanonicalProductRegulatoryProfile(name="X", economic_operator_role=EconomicOperatorRole.IMPORTER)
assert to_reasoning_profile(c).manufacturer_role == ManufacturerRole.IMPORTER
# 6. radio_module exists (direct + inferred from a BOM component).
def test_radio_module_exists():
assert to_reasoning_profile(CanonicalProductRegulatoryProfile(name="X", has_radio_module=True)).has_radio_module is True
c = CanonicalProductRegulatoryProfile(name="X", components=[ProductComponent(name="WLAN", kind=ComponentKind.RADIO_MODULE)])
assert to_reasoning_profile(c).has_radio_module is True
# 7. generates_usage_data exists (direct + inferred from telemetry).
def test_generates_usage_data_exists():
c = CanonicalProductRegulatoryProfile(name="X", generates_usage_data=True)
assert to_reasoning_profile(c).generates_usage_data is True
inferred = from_product_wizard(WIZARD) # data_processing has telemetry
assert to_reasoning_profile(inferred).generates_usage_data is True
# 8. lifecycle_phase exists and maps.
def test_lifecycle_phase_exists():
c = CanonicalProductRegulatoryProfile(name="X", lifecycle_phase=CanonicalLifecyclePhase.MAINTENANCE)
assert to_reasoning_profile(c).lifecycle_phase == ProductLifecyclePhase.MAINTENANCE
# 9. BOM components are structured.
def test_bom_components_structured():
c = CanonicalProductRegulatoryProfile(
name="Spülmaschine",
components=[
ProductComponent(name="Umwälzpumpe", kind=ComponentKind.PUMP),
ProductComponent(name="Heizung", kind=ComponentKind.HEATING),
ProductComponent(name="SPS", kind=ComponentKind.PLC),
ProductComponent(name="Abwasserablauf", kind=ComponentKind.WASTEWATER_OUTLET),
],
)
kinds = {comp.kind for comp in c.components}
assert ComponentKind.PLC in kinds and ComponentKind.WASTEWATER_OUTLET in kinds
# 10. reasoning engine + gap engine run off ONE semantic profile.
def test_one_semantic_profile_reasoning_and_gap():
canonical = CanonicalProductRegulatoryProfile(
name="Industriespülmaschine",
product_type=CanonicalProductType.MACHINERY,
economic_operator_role=EconomicOperatorRole.MANUFACTURER,
markets=["EU", "DE"],
is_machine=True,
has_safety_function=True,
has_remote_access=True,
has_software_updates=True,
has_embedded_software=True,
technologies=["cloud", "ota_updates"],
)
gap = to_gap_profile(canonical)
rp = to_reasoning_profile(canonical)
# same facts, two projections
assert gap["markets"] == ["EU", "DE"]
assert rp.eu_market is True
assert rp.has_remote_access is True
assert rp.has_cloud_connection is True
assert rp.is_machine is True
assert rp.manufacturer_role == ManufacturerRole.MANUFACTURER
# the projected reasoning profile actually drives the reasoning engine
scope = discover_scope(rp)
applicable = {r.regulation_id for r in scope.applicable_regulations}
assert "CRA" in applicable
assert "MaschinenVO" in applicable
@@ -0,0 +1,282 @@
"""Tests for the Regulatory Reasoning Engine.
Covers the five typical machine-builder scenarios and the ten acceptance
questions from the build spec (§15). Engine tests are pure (no DB); the
endpoint smoke tests mount only the reasoning router.
"""
from __future__ import annotations
from datetime import date
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from compliance.reasoning import (
assess_interpretation,
derive_obligations,
discover_scope,
normalize_claim,
reason_implementation_claim,
)
from compliance.reasoning.enums import (
ApplicabilityStatus,
ClaimCoverage,
InterpretationVerdict,
)
from compliance.reasoning.schemas import ProductProfile
from compliance.reasoning.enums import ManufacturerRole
# ---------------------------------------------------------------------------
# Fixtures / builders
# ---------------------------------------------------------------------------
def sps_profile(**overrides) -> ProductProfile:
base = dict(
product_name="SPS mit HMI",
product_type=["SPS", "HMI", "Schaltschrank"],
has_software=True,
has_remote_access=True,
has_cloud_connection=True,
eu_market=True,
manufacturer_role=ManufacturerRole.MANUFACTURER,
)
base.update(overrides)
return ProductProfile(**base)
def _reg_ids(scope, attr):
return [getattr(r, "regulation_id") for r in getattr(scope, attr)]
# ---------------------------------------------------------------------------
# 1. Gilt CRA für eine SPS mit Fernwartung?
# ---------------------------------------------------------------------------
def test_cra_applies_to_sps_with_remote_access():
scope = discover_scope(sps_profile())
cra = [r for r in scope.applicable_regulations if r.regulation_id == "CRA"]
assert cra and cra[0].applicability_status == ApplicabilityStatus.APPLICABLE
assert cra[0].confidence.value == "high"
assert any("digitale Elemente" in f or "Fernzugriff" in f for f in cra[0].trigger_facts) or cra[0].trigger_facts
# ---------------------------------------------------------------------------
# 2. Katalogprodukt 2027 weiter verkauft -> CRA gilt; "nur neue Produkte" zu eng
# ---------------------------------------------------------------------------
def test_cra_applies_to_finished_catalog_product():
profile = sps_profile(placed_on_market_after=date(2027, 1, 1), lifecycle_phase="placing_on_market")
scope = discover_scope(profile)
assert "CRA" in _reg_ids(scope, "applicable_regulations")
def test_interpretation_only_new_products_is_too_narrow():
result = assess_interpretation("Wir glauben, der CRA gilt nur für neue Produkte.")
assert result.assessment == InterpretationVerdict.TOO_NARROW
assert "CRA" in result.affected_regulations
assert result.corrected_interpretation
assert result.legal_basis_refs
# ---------------------------------------------------------------------------
# 3. Reicht eine SBOM allein? -> nein, nur teilweise
# ---------------------------------------------------------------------------
def test_sbom_alone_is_not_enough():
resp = reason_implementation_claim(sps_profile(), "Wir haben SBOMs.")
sbom = [m for m in resp.mappings if m.obligation_id == "sbom_creation"]
assert sbom and sbom[0].claim_coverage == ClaimCoverage.POTENTIALLY_ADDRESSES
# but other obligations are surfaced as gaps -> claim does not address everything
assert any(m.claim_coverage != ClaimCoverage.POTENTIALLY_ADDRESSES for m in resp.mappings)
assert "Nachweise" in resp.summary
# ---------------------------------------------------------------------------
# 4. Ist ein reaktiver Updateprozess ausreichend? -> nur teilweise
# ---------------------------------------------------------------------------
def test_reactive_update_process_is_partial():
resp = reason_implementation_claim(
sps_profile(), "Wir machen Updates, wenn Kunden Fehler melden."
)
upd = [m for m in resp.mappings if m.obligation_id == "provide_security_updates"]
assert upd and upd[0].claim_coverage == ClaimCoverage.PARTIALLY_ADDRESSES
assert "reactive" in resp.claim.qualifiers
assert any("Schwachstellenüberwachung" in e for e in upd[0].missing_elements)
# ---------------------------------------------------------------------------
# 5. Wann überschneiden sich CRA und MaschinenVO?
# ---------------------------------------------------------------------------
def test_cra_and_machinery_overlap_on_cyber_safety():
profile = sps_profile(is_machine=True, has_safety_function=True)
resp = derive_obligations(profile)
ids = [o.obligation_id for o in resp.applicable_obligations]
assert "machine_protection_against_corruption" in ids
assert "vuln_handling_process" in ids
vuln_overlap = [o for o in resp.overlaps if o.overlap_group_id == "VULNERABILITY_HANDLING"]
assert vuln_overlap
assert "machine_protection_against_corruption" in vuln_overlap[0].obligations
# ---------------------------------------------------------------------------
# 6. Wann ist Data Act zusätzlich relevant?
# ---------------------------------------------------------------------------
def test_data_act_relevant_when_product_generates_data():
scope = discover_scope(sps_profile(generates_usage_data=True))
assert "DataAct" in _reg_ids(scope, "applicable_regulations")
obs = derive_obligations(sps_profile(generates_usage_data=True))
assert any(o.source_regulation == "DataAct" for o in obs.applicable_obligations)
def test_data_act_uncertain_when_data_unknown():
scope = discover_scope(sps_profile()) # generates_usage_data=None
assert "DataAct" in _reg_ids(scope, "uncertain_regulations")
# ---------------------------------------------------------------------------
# 7. Welche Pflichten gelten nicht ohne Funkmodul?
# ---------------------------------------------------------------------------
def test_no_radio_module_excludes_red():
scope = discover_scope(sps_profile(has_radio_module=False))
assert "RED" in _reg_ids(scope, "excluded_regulations")
assert "RED" not in _reg_ids(scope, "applicable_regulations")
def test_radio_unknown_makes_red_uncertain():
scope = discover_scope(sps_profile()) # has_radio_module=None
assert "RED" in _reg_ids(scope, "uncertain_regulations")
# ---------------------------------------------------------------------------
# 8. Welche Fakten fehlen für eine NIS2-Bewertung?
# ---------------------------------------------------------------------------
def test_nis2_missing_facts():
scope = discover_scope(sps_profile())
nis2 = [r for r in scope.uncertain_regulations if r.regulation_id == "NIS2"]
assert nis2
joined = " ".join(nis2[0].missing_facts).lower()
assert "unternehmensgröße" in joined and "sektor" in joined
# ---------------------------------------------------------------------------
# 9. Welche Nachweise decken mehrere Pflichten gleichzeitig? (USP)
# ---------------------------------------------------------------------------
def test_evidence_covers_multiple_obligations():
resp = derive_obligations(sps_profile())
multi = resp.evidence_for_multiple
assert multi # at least one evidence type spans >1 obligation
assert all(len(ids) > 1 for ids in multi.values())
assert "policy" in multi # the CRA process docs share a policy evidence
# ---------------------------------------------------------------------------
# 10. Auslegungen: zu eng / zu weit / plausibel / unbekannt
# ---------------------------------------------------------------------------
def test_interpretation_unknown_returns_uncertain():
result = assess_interpretation("Der Mond beeinflusst unsere Updatezyklen.")
assert result.assessment == InterpretationVerdict.UNCERTAIN
assert result.corrected_interpretation
def test_interpretation_open_source_partially_correct():
result = assess_interpretation("Open Source ist ausgenommen, also betrifft uns der CRA nicht.")
assert result.assessment == InterpretationVerdict.PARTIALLY_CORRECT
# ---------------------------------------------------------------------------
# Registry-alignment + contract guards
# ---------------------------------------------------------------------------
def test_cra_obligations_reuse_registry_ids_not_minted():
resp = derive_obligations(sps_profile())
anchored = [o for o in resp.applicable_obligations if o.registry_anchor]
assert "sbom_creation" in [o.obligation_id for o in anchored]
assert "provide_security_updates" in [o.obligation_id for o in anchored]
# machine obligations are proposed, never claimed as registry-owned
machine = [o for o in resp.applicable_obligations if o.source_regulation == "MaschinenVO"]
assert all(o.proposed and not o.registry_anchor for o in machine)
def test_required_evidence_only_uses_shared_catalog():
from compliance.reasoning.rules_types import EVIDENCE_CATALOG
from compliance.reasoning.rules_obligations import ALL_OBLIGATIONS
for rule in ALL_OBLIGATIONS:
assert set(rule.required_evidence) <= EVIDENCE_CATALOG
def test_claim_normalizer_is_deterministic():
a = normalize_claim("Wir haben einen Update-Prozess.")
b = normalize_claim("Wir haben einen Update-Prozess.")
assert a.claim_id == b.claim_id
assert "secure_updates" in a.claimed_capability
def test_unspecific_claim_asks_for_detail():
resp = reason_implementation_claim(sps_profile(), "Wir sind sicher aufgestellt.")
assert resp.mappings == [] or all(
m.claim_coverage == ClaimCoverage.INSUFFICIENT_INFORMATION for m in resp.mappings
)
assert "unspezifisch" in resp.summary.lower()
def test_claim_reasoning_carries_no_compliance_verdict():
"""Welt-1 boundary: claim mapping must never read as a conformity verdict."""
resp = reason_implementation_claim(
sps_profile(), "Wir haben SBOMs und einen Update-Prozess."
)
# claim-relative vocabulary only
for m in resp.mappings:
assert m.claim_coverage in set(ClaimCoverage)
# no compliance wording leaks into summary or explanations
assert "erfüllt" not in resp.summary
assert all("erfüllt" not in m.explanation for m in resp.mappings)
# explicit disclaimer separating ClaimCoverage (Welt 1) from ComplianceStatus (Welt 2)
assert resp.disclaimer
assert "ComplianceStatus" in resp.disclaimer and "Nachweis" in resp.disclaimer
# ---------------------------------------------------------------------------
# Endpoint smoke tests
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def client():
from compliance.api.reasoning_routes import router
app = FastAPI()
app.include_router(router)
return TestClient(app)
def test_endpoint_scope(client):
r = client.post("/reasoning/scope", json={"product_profile": {"product_name": "X", "has_software": True, "eu_market": True, "manufacturer_role": "manufacturer"}})
assert r.status_code == 200
body = r.json()
assert "CRA" in [x["regulation_id"] for x in body["regulatory_scope"]["applicable_regulations"]]
def test_endpoint_obligations(client):
r = client.post(
"/reasoning/obligations",
json={"product_profile": {"product_name": "X", "has_software": True, "has_remote_access": True, "eu_market": True, "manufacturer_role": "manufacturer"}},
)
assert r.status_code == 200
assert r.json()["applicable_obligations"]
def test_endpoint_implementation(client):
r = client.post(
"/reasoning/implementation-reasoning",
json={"product_profile": {"product_name": "X", "has_software": True, "eu_market": True, "manufacturer_role": "manufacturer"}, "customer_claim": "Wir haben SBOMs."},
)
assert r.status_code == 200
body = r.json()
assert body["mappings"]
assert body["disclaimer"]
def test_endpoint_interpretation(client):
r = client.post(
"/reasoning/interpretation-assessment",
json={"customer_interpretation": "CRA gilt nur für neue Produkte."},
)
assert r.status_code == 200
assert r.json()["assessment"] == "too_narrow"
@@ -0,0 +1,159 @@
"""Tests for the Regulatory Map renderer (step 4).
Acceptance: the renderer makes no own legal decisions (it composes the scope +
registry-linked obligations); CRA/MaschVO/EMV are separate; RED/DataAct/NIS2 are
uncertain; environmental is unsupported (not applicable); obligations appear only
when registry-linkable; the executive summary has no percentage.
"""
from __future__ import annotations
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from compliance.product_scope import resolve_product_scope
from compliance.profile.canonical import (
CanonicalLifecyclePhase,
CanonicalProductRegulatoryProfile,
CanonicalProductType,
EconomicOperatorRole,
EnvironmentalImpact,
)
from compliance.regulatory_map import render_regulatory_map
_PROPOSED_IDS = {
"machine_risk_assessment", "machine_safety_control_systems", "machine_protection_against_corruption",
"machine_instructions_for_use", "machine_ce_conformity", "data_act_data_access_by_design",
"data_act_user_data_access", "cra_secure_by_design", "cra_risk_assessment",
"cra_technical_documentation", "cra_ce_conformity_assessment", "cra_instructions_for_use",
}
def ready_profile(**ov) -> CanonicalProductRegulatoryProfile:
base = dict(
name="Industriespülmaschine",
product_type=CanonicalProductType.MACHINERY,
markets=["EU", "DE"],
economic_operator_role=EconomicOperatorRole.MANUFACTURER,
lifecycle_phase=CanonicalLifecyclePhase.PLACING_ON_MARKET,
is_machine=True,
is_component=False,
has_software_updates=True,
has_embedded_software=True,
has_remote_access=True,
technologies=["cloud", "ota_updates"],
)
base.update(ov)
return CanonicalProductRegulatoryProfile(**base)
# 1. renderer makes no own decisions — it mirrors the scope verdict exactly.
def test_no_own_legal_decisions():
p = ready_profile()
m = render_regulatory_map(p)
scope = resolve_product_scope(p).regulatory_scope
assert {v.regulation_id for v in m.applicable_regulations} == {
r.regulation_id for r in scope.applicable_regulations
}
assert {v.regulation_id for v in m.uncertain_regulations} == {
r.regulation_id for r in scope.uncertain_regulations
}
# 2/3/5. CRA/MaschVO/EMV separate applicable; RED/DataAct/NIS2 uncertain.
def test_regulation_separation():
m = render_regulatory_map(ready_profile())
applicable = {v.regulation_id for v in m.applicable_regulations}
uncertain = {v.regulation_id for v in m.uncertain_regulations}
assert {"CRA", "MaschinenVO", "EMV"} <= applicable
assert {"RED", "DataAct", "NIS2"} <= uncertain
# 4. environmental triggers surface as unsupported_domain, never applicable.
def test_environmental_unsupported_not_applicable():
p = ready_profile(environmental=EnvironmentalImpact(discharges_to_wastewater=True, uses_cleaning_chemicals=True))
m = render_regulatory_map(p)
domains = {d.domain for d in m.unsupported_domains}
assert "environment_water" in domains and "chemicals" in domains
assert all(v.regulation_id in {"CRA", "MaschinenVO", "RED", "DataAct", "EMV", "NIS2"} for v in m.applicable_regulations)
# 6. obligations are shown only when a registry id is linkable.
def test_obligations_only_registry_linkable():
m = render_regulatory_map(ready_profile())
shown = {o.obligation_id for v in m.applicable_regulations for o in v.obligations}
assert shown # CRA registry obligations are shown
assert "sbom_creation" in shown
assert not (shown & _PROPOSED_IDS) # no proposed (non-registry) obligation leaks in
# MaschinenVO is applicable but its obligations are proposed -> empty + note
machvo = next(v for v in m.applicable_regulations if v.regulation_id == "MaschinenVO")
assert machvo.obligations == []
assert machvo.obligations_note
# 7. executive summary contains no percentage.
def test_executive_summary_no_percent():
m = render_regulatory_map(ready_profile())
assert "%" not in m.executive_summary
assert "prozent" not in m.executive_summary.lower()
# 8. output is customer-readable and structured.
def test_customer_readable():
m = render_regulatory_map(ready_profile())
assert m.product_summary
assert "wahrscheinlich" in m.executive_summary
assert "Unsicher" in m.executive_summary
assert m.trigger_facts
# needs-facts profile -> map says scope not yet resolved.
def test_needs_facts_map():
m = render_regulatory_map(CanonicalProductRegulatoryProfile(name="X"))
assert m.scope_resolved is False
assert "Mindestfakten" in m.executive_summary
assert m.applicable_regulations == []
# uncertain RED links to the radio navigator question.
def test_uncertain_links_to_navigator_question():
m = render_regulatory_map(ready_profile())
red = next(v for v in m.uncertain_regulations if v.regulation_id == "RED")
assert "has_radio_module" in red.question_refs
# endpoint smoke.
@pytest.fixture(scope="module")
def client():
from compliance.api.reasoning_routes import router
app = FastAPI()
app.include_router(router)
return TestClient(app)
def test_endpoint_regulatory_map(client):
r = client.post(
"/reasoning/regulatory-map",
json={
"product_profile": {
"name": "M",
"product_type": "machinery",
"markets": ["EU"],
"economic_operator_role": "manufacturer",
"lifecycle_phase": "placing_on_market",
"is_machine": True,
"is_component": False,
"has_software_updates": True,
"has_embedded_software": True,
"has_remote_access": True,
"technologies": ["cloud"],
}
},
)
assert r.status_code == 200
body = r.json()
assert body["scope_resolved"] is True
assert {v["regulation_id"] for v in body["applicable_regulations"]} >= {"CRA", "MaschinenVO"}
assert "%" not in body["executive_summary"]