1607c89459
Deterministic reasoning layer ON TOP of the Legal Knowledge Graph (obligation
registry) and the Compliance Execution Graph (control mapping/evidence). Answers
which regulations apply to a concrete product, which obligations follow, whether
the customer's implementation covers them, and whether a customer interpretation
is too narrow/broad/plausible.
- ProductProfile with tri-state facts (Optional[bool]=None => uncertain, never
false security); safe predicate evaluator (no eval).
- 6 regulation triggers (CRA/MaschinenVO/RED/EMV/DataAct/NIS2) with missing-fact
prompts; 24 obligation scope rules.
- CRA obligation_ids RE-USED verbatim from the registry (93 ids) — never re-minted
(control_uuid trap); Machine/Data-Act flagged proposed=True.
- required_evidence constrained to the framework-agnostic shared evidence catalog;
capabilities echo the planned Obligation->Capability layer.
- Overlap groups (CRA<->MaschinenVO cyber-safety) + evidence-for-multiple (USP).
- 4 endpoints POST /reasoning/{scope,obligations,implementation-assessment,
interpretation-assessment}; thin handlers, registered in api/__init__.py.
- 22 tests (5 machine-builder scenarios + 10 acceptance questions). No DB
migration, no RAG, no new controls.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
117 lines
4.3 KiB
Python
117 lines
4.3 KiB
Python
"""Applicable-obligation engine (spec Modus 2).
|
|
|
|
Maps a product profile (optionally a precomputed scope) to the concrete legal
|
|
obligations, the overlaps between them, and which evidence types satisfy more
|
|
than one obligation at once (the core USP, spec §16).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Dict, List, Optional
|
|
|
|
from .predicates import evaluate, true_leaves
|
|
from .rules_obligations import ALL_OBLIGATIONS
|
|
from .rules_overlaps import OVERLAP_GROUPS
|
|
from .rules_regulations import FIELD_LABELS
|
|
from .rules_types import ObligationRule
|
|
from .schemas import (
|
|
ApplicableObligation,
|
|
ObligationOverlap,
|
|
ObligationsResponse,
|
|
ProductProfile,
|
|
RegulatoryScope,
|
|
)
|
|
from .scope_engine import discover_scope
|
|
|
|
|
|
def _applicable_regulation_ids(profile: ProductProfile, scope: Optional[RegulatoryScope]) -> List[str]:
|
|
if scope is None:
|
|
scope = discover_scope(profile)
|
|
return [r.regulation_id for r in scope.applicable_regulations]
|
|
|
|
|
|
def _applies_because(rule: ObligationRule, profile: ProductProfile) -> List[str]:
|
|
labels: List[str] = []
|
|
for leaf in true_leaves(rule.applies_if, profile):
|
|
label = FIELD_LABELS.get(leaf[0])
|
|
if label and label not in labels:
|
|
labels.append(label)
|
|
if not labels:
|
|
labels.append("%s ist für dieses Produkt anwendbar." % rule.source_regulation)
|
|
return labels
|
|
|
|
|
|
def _role_ok(rule: ObligationRule, profile: ProductProfile) -> bool:
|
|
role = profile.manufacturer_role
|
|
if role is None:
|
|
return True # unknown role -> do not exclude
|
|
return role.value in rule.applies_to_role
|
|
|
|
|
|
def derive_obligations(
|
|
profile: ProductProfile, scope: Optional[RegulatoryScope] = None
|
|
) -> ObligationsResponse:
|
|
active_regs = set(_applicable_regulation_ids(profile, scope))
|
|
response = ObligationsResponse()
|
|
applied_ids: List[str] = []
|
|
|
|
for rule in ALL_OBLIGATIONS:
|
|
if rule.source_regulation not in active_regs:
|
|
continue
|
|
if rule.applies_unless is not None and evaluate(rule.applies_unless, profile) is True:
|
|
continue
|
|
verdict = evaluate(rule.applies_if, profile)
|
|
if verdict is not True or not _role_ok(rule, profile):
|
|
if verdict is False:
|
|
response.excluded_obligations.append(rule.obligation_id)
|
|
continue
|
|
applied_ids.append(rule.obligation_id)
|
|
response.applicable_obligations.append(
|
|
ApplicableObligation(
|
|
obligation_id=rule.obligation_id,
|
|
title=rule.title,
|
|
source_regulation=rule.source_regulation,
|
|
legal_basis_refs=rule.legal_basis_refs,
|
|
obligation_text=rule.obligation_text,
|
|
authority_level=rule.authority_level,
|
|
applies_because=_applies_because(rule, profile),
|
|
applies_to_role=rule.applies_to_role,
|
|
lifecycle_phase=rule.lifecycle_phase,
|
|
overlap_group_id=rule.overlap_group_id,
|
|
required_evidence=rule.required_evidence,
|
|
confidence=rule.base_confidence,
|
|
registry_anchor=rule.registry_anchor,
|
|
proposed=rule.proposed,
|
|
)
|
|
)
|
|
|
|
response.overlaps = _overlaps(applied_ids)
|
|
response.evidence_for_multiple = _evidence_for_multiple(response.applicable_obligations)
|
|
return response
|
|
|
|
|
|
def _overlaps(applied_ids: List[str]) -> List[ObligationOverlap]:
|
|
applied = set(applied_ids)
|
|
out: List[ObligationOverlap] = []
|
|
for group in OVERLAP_GROUPS:
|
|
present = [m for m in group.members if m in applied]
|
|
if len(present) >= 2:
|
|
out.append(
|
|
ObligationOverlap(
|
|
overlap_group_id=group.overlap_group_id,
|
|
obligations=present,
|
|
overlap_type=group.overlap_type,
|
|
canonical_obligation_id=group.canonical_obligation_id,
|
|
explanation=group.explanation,
|
|
)
|
|
)
|
|
return out
|
|
|
|
|
|
def _evidence_for_multiple(obligations: List[ApplicableObligation]) -> Dict[str, List[str]]:
|
|
by_evidence: Dict[str, List[str]] = {}
|
|
for ob in obligations:
|
|
for ev in ob.required_evidence:
|
|
by_evidence.setdefault(ev, []).append(ob.obligation_id)
|
|
return {ev: ids for ev, ids in by_evidence.items() if len(ids) > 1}
|