1607c89459
Deterministic reasoning layer ON TOP of the Legal Knowledge Graph (obligation
registry) and the Compliance Execution Graph (control mapping/evidence). Answers
which regulations apply to a concrete product, which obligations follow, whether
the customer's implementation covers them, and whether a customer interpretation
is too narrow/broad/plausible.
- ProductProfile with tri-state facts (Optional[bool]=None => uncertain, never
false security); safe predicate evaluator (no eval).
- 6 regulation triggers (CRA/MaschinenVO/RED/EMV/DataAct/NIS2) with missing-fact
prompts; 24 obligation scope rules.
- CRA obligation_ids RE-USED verbatim from the registry (93 ids) — never re-minted
(control_uuid trap); Machine/Data-Act flagged proposed=True.
- required_evidence constrained to the framework-agnostic shared evidence catalog;
capabilities echo the planned Obligation->Capability layer.
- Overlap groups (CRA<->MaschinenVO cyber-safety) + evidence-for-multiple (USP).
- 4 endpoints POST /reasoning/{scope,obligations,implementation-assessment,
interpretation-assessment}; thin handlers, registered in api/__init__.py.
- 22 tests (5 machine-builder scenarios + 10 acceptance questions). No DB
migration, no RAG, no new controls.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
137 lines
5.2 KiB
Python
137 lines
5.2 KiB
Python
"""Scope discovery engine (spec Modus 1).
|
|
|
|
Answers "which regulations apply to my product?" — and, crucially, never says
|
|
"X applies" without the triggers, and never hides a missing fact behind a false
|
|
verdict. Pure rule evaluation, deterministic.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import List, Optional
|
|
|
|
from .enums import ApplicabilityStatus, Confidence
|
|
from .predicates import Condition, evaluate, true_leaves, unknown_fields
|
|
from .rules_regulations import REGULATION_RULES, FIELD_LABELS, RegulationRule
|
|
from .schemas import (
|
|
ApplicableRegulation,
|
|
ExcludedRegulation,
|
|
ProductProfile,
|
|
RegulatoryScope,
|
|
UncertainRegulation,
|
|
)
|
|
|
|
_DOWNGRADE = {Confidence.HIGH: Confidence.MEDIUM, Confidence.MEDIUM: Confidence.LOW, Confidence.LOW: Confidence.LOW}
|
|
|
|
|
|
def _fields_in(condition: Optional[Condition]) -> List[str]:
|
|
if condition is None:
|
|
return []
|
|
if isinstance(condition, tuple):
|
|
return [condition[0]]
|
|
out: List[str] = []
|
|
for c in condition.get("all") or condition.get("any") or []:
|
|
out.extend(_fields_in(c))
|
|
return out
|
|
|
|
|
|
def _trigger_facts(rule: RegulationRule, profile: ProductProfile) -> List[str]:
|
|
labels: List[str] = []
|
|
for leaf in true_leaves(rule.trigger, profile):
|
|
label = FIELD_LABELS.get(leaf[0])
|
|
if label and label not in labels:
|
|
labels.append(label)
|
|
return labels
|
|
|
|
|
|
def _missing_prompts(rule: RegulationRule, profile: ProductProfile) -> List[str]:
|
|
fields = list(dict.fromkeys(rule.required_facts + _fields_in(rule.trigger)))
|
|
unknown = unknown_fields(fields, profile)
|
|
prompts: List[str] = []
|
|
for f in unknown:
|
|
prompt = rule.fact_prompts.get(f)
|
|
if prompt and prompt not in prompts:
|
|
prompts.append(prompt)
|
|
return prompts
|
|
|
|
|
|
def discover_scope(profile: ProductProfile) -> RegulatoryScope:
|
|
scope = RegulatoryScope(product_profile_id=profile.product_profile_id)
|
|
|
|
for rule in REGULATION_RULES:
|
|
role_value = profile.manufacturer_role.value if profile.manufacturer_role is not None else None
|
|
role_excluded = role_value is not None and role_value in rule.excludable_roles
|
|
trig = evaluate(rule.trigger, profile)
|
|
missing = _missing_prompts(rule, profile)
|
|
|
|
if role_excluded:
|
|
scope.excluded_regulations.append(
|
|
ExcludedRegulation(
|
|
regulation_id=rule.regulation_id,
|
|
name=rule.name,
|
|
reason="Rolle '%s' ist von dieser Regulierung nicht unmittelbar adressiert." % role_value,
|
|
)
|
|
)
|
|
continue
|
|
|
|
if trig is True:
|
|
conf = Confidence.MEDIUM if rule.inferred else rule.confidence_when_applicable
|
|
status = (
|
|
ApplicabilityStatus.PARTIALLY_APPLICABLE if rule.inferred else ApplicabilityStatus.APPLICABLE
|
|
)
|
|
unresolved = unknown_fields(rule.required_facts, profile)
|
|
if unresolved:
|
|
conf = _DOWNGRADE[conf]
|
|
for f in unresolved:
|
|
prompt = rule.fact_prompts.get(f)
|
|
if prompt and prompt not in scope.missing_facts:
|
|
scope.missing_facts.append(prompt)
|
|
scope.applicable_regulations.append(
|
|
ApplicableRegulation(
|
|
regulation_id=rule.regulation_id,
|
|
name=rule.name,
|
|
applicability_status=status,
|
|
trigger_facts=_trigger_facts(rule, profile),
|
|
legal_basis_refs=rule.legal_basis_refs,
|
|
confidence=conf,
|
|
explanation=rule.summary,
|
|
)
|
|
)
|
|
elif trig is None:
|
|
scope.uncertain_regulations.append(
|
|
UncertainRegulation(
|
|
regulation_id=rule.regulation_id,
|
|
name=rule.name,
|
|
missing_facts=missing,
|
|
explanation=rule.summary,
|
|
)
|
|
)
|
|
for m in missing:
|
|
if m not in scope.missing_facts:
|
|
scope.missing_facts.append(m)
|
|
else: # trig is False -> definitively excluded by a known fact
|
|
scope.excluded_regulations.append(
|
|
ExcludedRegulation(
|
|
regulation_id=rule.regulation_id,
|
|
name=rule.name,
|
|
reason="Auslösende Voraussetzungen sind anhand der bekannten Fakten nicht erfüllt.",
|
|
)
|
|
)
|
|
|
|
scope.confidence = _overall_confidence(scope)
|
|
scope.reasoning_summary = _summary(scope)
|
|
return scope
|
|
|
|
|
|
def _overall_confidence(scope: RegulatoryScope) -> Confidence:
|
|
if scope.applicable_regulations and not scope.uncertain_regulations and not scope.missing_facts:
|
|
return Confidence.HIGH
|
|
if scope.applicable_regulations:
|
|
return Confidence.MEDIUM
|
|
return Confidence.LOW
|
|
|
|
|
|
def _summary(scope: RegulatoryScope) -> str:
|
|
applicable = ", ".join(r.regulation_id for r in scope.applicable_regulations) or "—"
|
|
uncertain = ", ".join(r.regulation_id for r in scope.uncertain_regulations) or "—"
|
|
return "Wahrscheinlich anwendbar: %s. Unsicher (fehlende Fakten): %s." % (applicable, uncertain)
|