feat(reasoning): Regulatory Reasoning Engine MVP (scope/obligations/implementation/interpretation)

Deterministic reasoning layer ON TOP of the Legal Knowledge Graph (obligation
registry) and the Compliance Execution Graph (control mapping/evidence). Answers
which regulations apply to a concrete product, which obligations follow, whether
the customer's implementation covers them, and whether a customer interpretation
is too narrow/broad/plausible.

- ProductProfile with tri-state facts (Optional[bool]=None => uncertain, never
  false security); safe predicate evaluator (no eval).
- 6 regulation triggers (CRA/MaschinenVO/RED/EMV/DataAct/NIS2) with missing-fact
  prompts; 24 obligation scope rules.
- CRA obligation_ids RE-USED verbatim from the registry (93 ids) — never re-minted
  (control_uuid trap); Machine/Data-Act flagged proposed=True.
- required_evidence constrained to the framework-agnostic shared evidence catalog;
  capabilities echo the planned Obligation->Capability layer.
- Overlap groups (CRA<->MaschinenVO cyber-safety) + evidence-for-multiple (USP).
- 4 endpoints POST /reasoning/{scope,obligations,implementation-assessment,
  interpretation-assessment}; thin handlers, registered in api/__init__.py.
- 22 tests (5 machine-builder scenarios + 10 acceptance questions). No DB
  migration, no RAG, no new controls.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-25 19:30:53 +02:00
parent e46e74ddbb
commit 1607c89459
20 changed files with 2270 additions and 0 deletions
@@ -77,6 +77,7 @@ _ROUTER_MODULES = [
"licenses_routes",
"template_rule_routes",
"specialist_agent_routes",
"reasoning_routes",
]
_loaded_count = 0
@@ -0,0 +1,68 @@
"""HTTP endpoints for the Regulatory Reasoning Engine (spec §7).
Thin handlers — all reasoning lives in `compliance.reasoning.*`. No DB, no RAG;
pure deterministic rule evaluation.
POST /reasoning/scope -> which regulations apply + missing facts
POST /reasoning/obligations -> obligations, overlaps, multi-evidence
POST /reasoning/implementation-assessment -> claim coverage per obligation
POST /reasoning/interpretation-assessment -> verdict on a customer interpretation
"""
from __future__ import annotations
from fastapi import APIRouter
from compliance.reasoning import (
assess_implementation,
assess_interpretation,
derive_obligations,
discover_scope,
)
from compliance.reasoning.schemas import (
ImplementationRequest,
ImplementationResponse,
InterpretationRequest,
InterpretationResponse,
ObligationsRequest,
ObligationsResponse,
ScopeRequest,
ScopeResponse,
)
router = APIRouter(prefix="/reasoning", tags=["reasoning"])
@router.post("/scope", response_model=ScopeResponse)
def scope_discovery(req: ScopeRequest) -> ScopeResponse:
scope = discover_scope(req.product_profile)
return ScopeResponse(
regulatory_scope=scope,
missing_facts=scope.missing_facts,
confidence=scope.confidence,
)
@router.post("/obligations", response_model=ObligationsResponse)
def applicable_obligations(req: ObligationsRequest) -> ObligationsResponse:
return derive_obligations(req.product_profile, req.regulatory_scope)
@router.post("/implementation-assessment", response_model=ImplementationResponse)
def implementation_assessment(req: ImplementationRequest) -> ImplementationResponse:
return assess_implementation(req.product_profile, req.customer_claim)
@router.post("/interpretation-assessment", response_model=InterpretationResponse)
def interpretation_assessment(req: InterpretationRequest) -> InterpretationResponse:
result = assess_interpretation(req.customer_interpretation, req.product_profile)
return InterpretationResponse(
assessment=result.assessment,
affected_regulations=result.affected_regulations,
affected_obligations=result.affected_obligations,
corrected_interpretation=result.corrected_interpretation,
risks=result.risks,
legal_basis_refs=result.legal_basis_refs,
explanation=result.explanation,
confidence=result.confidence,
)
@@ -0,0 +1,27 @@
"""Regulatory Reasoning Engine.
A deterministic reasoning layer ON TOP of the Legal Knowledge Graph (obligation
registry) and the Compliance Execution Graph (control mapping / evidence). It
answers, for a concrete product: which regulations apply, which obligations
follow, whether the customer's implementation covers them, and whether a
customer interpretation is legally sound.
No new RAG, no new controls, no DB schema changes — scope & reasoning metamodel
only (spec §14).
"""
from __future__ import annotations
from .claim_normalizer import normalize_claim
from .implementation_engine import assess_implementation
from .interpretation_engine import assess_interpretation
from .obligation_engine import derive_obligations
from .scope_engine import discover_scope
__all__ = [
"discover_scope",
"derive_obligations",
"normalize_claim",
"assess_implementation",
"assess_interpretation",
]
@@ -0,0 +1,45 @@
"""Customer implementation claim normaliser (spec §4.6).
Turns a free-text statement ("Wir haben einen Update-Prozess.") into structured
capabilities + related topics + weakness qualifiers. Deterministic substring
matching — the claim_id is a stable hash so the same statement always maps to
the same id (no randomness, replay-safe).
"""
from __future__ import annotations
import hashlib
from typing import List, Optional
from .schemas import CustomerImplementationClaim
from .taxonomy_claims import match_capabilities, match_qualifiers, topics_for
def _claim_id(raw_statement: str) -> str:
digest = hashlib.sha1(raw_statement.strip().lower().encode("utf-8")).hexdigest()
return "claim_%s" % digest[:10]
def _normalized(capabilities: List[str], qualifiers: List[str]) -> str:
if not capabilities:
return "Keine bekannte Compliance-Fähigkeit aus der Aussage ableitbar."
text = "Fähigkeiten: " + ", ".join(capabilities)
if qualifiers:
text += " | Einschränkungen: " + ", ".join(qualifiers)
return text
def normalize_claim(
raw_statement: str, claim_id: Optional[str] = None, evidence_refs: Optional[List[str]] = None
) -> CustomerImplementationClaim:
capabilities = match_capabilities(raw_statement)
qualifiers = match_qualifiers(raw_statement)
return CustomerImplementationClaim(
claim_id=claim_id or _claim_id(raw_statement),
raw_statement=raw_statement,
normalized_claim=_normalized(capabilities, qualifiers),
claimed_capability=capabilities,
related_topics=topics_for(capabilities),
qualifiers=qualifiers,
evidence_refs=evidence_refs or [],
)
@@ -0,0 +1,85 @@
"""Enumerations for the Regulatory Reasoning Engine.
Kept dependency-free and Python 3.9 compatible (str-Enums, no `|` unions).
The reasoning layer sits ON TOP of the Legal Knowledge Graph (obligation
registry) and the Compliance Execution Graph (control mapping / evidence).
See memory `project_compliance_graph.md` for the cross-session contract.
"""
from __future__ import annotations
from enum import Enum
class ManufacturerRole(str, Enum):
MANUFACTURER = "manufacturer"
IMPORTER = "importer"
DISTRIBUTOR = "distributor"
INTEGRATOR = "integrator"
OPERATOR = "operator"
SERVICE_PROVIDER = "service_provider"
class ProductLifecyclePhase(str, Enum):
DEVELOPMENT = "development"
PLACING_ON_MARKET = "placing_on_market"
OPERATION = "operation"
MAINTENANCE = "maintenance"
UPDATE = "update"
END_OF_LIFE = "end_of_life"
class MarketModel(str, Enum):
B2B = "b2b"
B2C = "b2c"
BOTH = "both"
class ApplicabilityStatus(str, Enum):
APPLICABLE = "applicable"
PARTIALLY_APPLICABLE = "partially_applicable"
UNCERTAIN = "uncertain"
NOT_APPLICABLE = "not_applicable"
class Confidence(str, Enum):
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
class AuthorityLevel(str, Enum):
"""How binding a statement is — answers MUST visibly separate these."""
LEGAL_TEXT = "legal_text"
RECITAL = "recital"
GUIDANCE = "guidance"
HARMONIZED_STANDARD = "harmonized_standard"
TECHNICAL_STANDARD = "technical_standard"
BEST_PRACTICE = "best_practice"
INTERNAL_INTERPRETATION = "internal_interpretation"
class OverlapType(str, Enum):
IDENTICAL = "identical"
SIMILAR = "similar"
COMPLEMENTARY = "complementary"
CONFLICTING = "conflicting"
DIFFERENT_SCOPE = "different_scope"
class CoverageStatus(str, Enum):
COVERED = "covered"
PARTIALLY_COVERED = "partially_covered"
NOT_COVERED = "not_covered"
UNCLEAR = "unclear"
OUT_OF_SCOPE = "out_of_scope"
class InterpretationVerdict(str, Enum):
PLAUSIBLE = "plausible"
TOO_NARROW = "too_narrow"
TOO_BROAD = "too_broad"
PARTIALLY_CORRECT = "partially_correct"
UNSUPPORTED = "unsupported"
UNCERTAIN = "uncertain"
@@ -0,0 +1,142 @@
"""Implementation reasoning engine (spec Modus 3).
Given a free-text claim ("Wir haben SBOMs und machen Updates, wenn Kunden Fehler
melden.") it maps the claimed capabilities onto the product's applicable
obligations and reports, per obligation, whether it is covered, partially
covered or not covered — plus the evidence that would close the gap.
"""
from __future__ import annotations
from typing import Dict, List
from .claim_normalizer import normalize_claim
from .enums import Confidence, CoverageStatus
from .obligation_engine import derive_obligations
from .schemas import (
CustomerImplementationClaim,
ImplementationAssessment,
ImplementationResponse,
ProductProfile,
)
from .taxonomy_claims import topics_for
# Typical sub-elements a capability still misses when only partially claimed.
STANDARD_GAPS: Dict[str, List[str]] = {
"software_bill_of_materials": [
"Vulnerability-Monitoring der Komponenten",
"Bewertung betroffener Komponenten",
"Lieferantenprozess",
],
"secure_updates": [
"aktive Schwachstellenüberwachung",
"Patch-Bewertung",
"Fristen und Verantwortlichkeiten",
"Nachweis der Updatefähigkeit",
],
"vulnerability_management": [
"definierter Vulnerability-Handling-Prozess",
"Priorisierung und Fristen",
],
"authentication": ["MFA für privilegierte Zugänge", "keine Standard-Zugangsdaten"],
"security_logging": ["Schutz der Logs vor Manipulation", "Monitoring/Alerting"],
"software_integrity": ["Signierung der Updates", "Verifikation der Update-Signatur"],
"secure_by_default": ["Härtung der Auslieferungskonfiguration", "Minimierung der Angriffsfläche"],
"secure_communication": ["verschlüsselte Übertragung", "Integritätsschutz der Verbindung"],
"risk_assessment": ["dokumentierte Risikobewertung", "Aufnahme in die technische Doku"],
"technical_documentation": ["vollständige technische Unterlagen", "Aktualisierung über den Lebenszyklus"],
}
def _missing_for(capabilities: List[str]) -> List[str]:
out: List[str] = []
for cap in capabilities:
for gap in STANDARD_GAPS.get(cap, []):
if gap not in out:
out.append(gap)
return out
def _coverage(required: List[str], claimed: List[str], qualifiers: List[str]) -> CoverageStatus:
req, have = set(required), set(claimed)
hit = req & have
if not hit:
return CoverageStatus.NOT_COVERED
if "absent" in qualifiers or "planned" in qualifiers:
return CoverageStatus.NOT_COVERED
if "reactive" in qualifiers and hit & {"secure_updates", "vulnerability_management"}:
return CoverageStatus.PARTIALLY_COVERED
if req <= have:
return CoverageStatus.COVERED
return CoverageStatus.PARTIALLY_COVERED
def assess_implementation(profile: ProductProfile, customer_claim: str) -> ImplementationResponse:
claim = normalize_claim(customer_claim)
obligations = derive_obligations(profile).applicable_obligations
claimed = claim.claimed_capability
claim_topics = set(claim.related_topics) | set(claimed)
assessments: List[ImplementationAssessment] = []
missing_evidence: List[str] = []
for ob in obligations:
from .rules_obligations import obligation_rule
rule = obligation_rule(ob.obligation_id)
required_caps = rule.required_capabilities if rule else []
ob_topics = set(topics_for(required_caps)) | set(required_caps)
directly_claimed = bool(set(required_caps) & set(claimed))
related = bool(ob_topics & claim_topics)
if not directly_claimed and not related:
continue # unrelated to the claim -> don't assess
status = _coverage(required_caps, claimed, claim.qualifiers)
missing = [] if status == CoverageStatus.COVERED else _missing_for(required_caps)
explanation = _explain(status, ob.title, claim.qualifiers)
if status != CoverageStatus.COVERED:
for ev in ob.required_evidence:
if ev not in missing_evidence:
missing_evidence.append(ev)
assessments.append(
ImplementationAssessment(
claim_id=claim.claim_id,
obligation_id=ob.obligation_id,
coverage_status=status,
missing_elements=missing,
required_evidence=ob.required_evidence,
explanation=explanation,
confidence=Confidence.MEDIUM,
)
)
return ImplementationResponse(
claim=claim,
assessments=assessments,
missing_evidence=missing_evidence,
summary=_summary(claim, assessments),
)
def _explain(status: CoverageStatus, title: str, qualifiers: List[str]) -> str:
if status == CoverageStatus.COVERED:
return "Die Pflicht '%s' wird durch die beschriebene Umsetzung plausibel abgedeckt." % title
if status == CoverageStatus.PARTIALLY_COVERED:
extra = " Der Prozess wirkt reaktiv." if "reactive" in qualifiers else ""
return "Die Pflicht '%s' ist nur teilweise abgedeckt.%s" % (title, extra)
return "Die Pflicht '%s' wird durch die Aussage nicht abgedeckt." % title
def _summary(claim: CustomerImplementationClaim, assessments: List[ImplementationAssessment]) -> str:
if not claim.claimed_capability:
return "Die Aussage ist zu unspezifisch — bitte konkretisieren, was umgesetzt wurde."
covered = sum(1 for a in assessments if a.coverage_status == CoverageStatus.COVERED)
partial = sum(1 for a in assessments if a.coverage_status == CoverageStatus.PARTIALLY_COVERED)
notc = sum(1 for a in assessments if a.coverage_status == CoverageStatus.NOT_COVERED)
if notc or partial:
head = "Teilweise erfüllt"
elif covered:
head = "Plausibel abgedeckt"
else:
head = "Nicht beurteilbar"
return "%s: %d abgedeckt, %d teilweise, %d offen." % (head, covered, partial, notc)
@@ -0,0 +1,65 @@
"""Interpretation review engine (spec Modus 4).
Evaluates whether a customer's legal interpretation is plausible, too narrow,
too broad, etc. Matches the interpretation against a curated pattern library;
no match -> `uncertain` plus a request for the missing context (never invent a
verdict, spec §6.3).
"""
from __future__ import annotations
import hashlib
from typing import Optional
from .enums import Confidence, InterpretationVerdict
from .schemas import InterpretationAssessment, ProductProfile
from .taxonomy_interpretations import INTERPRETATION_PATTERNS, InterpretationPattern
def _interpretation_id(raw: str) -> str:
digest = hashlib.sha1(raw.strip().lower().encode("utf-8")).hexdigest()
return "interp_%s" % digest[:10]
def _best_match(text: str) -> Optional[InterpretationPattern]:
low = text.lower()
best: Optional[InterpretationPattern] = None
best_score = 0
for pattern in INTERPRETATION_PATTERNS:
score = sum(1 for t in pattern.triggers if t in low)
if score > best_score:
best, best_score = pattern, score
return best
def assess_interpretation(
raw_interpretation: str, profile: Optional[ProductProfile] = None
) -> InterpretationAssessment:
interp_id = _interpretation_id(raw_interpretation)
pattern = _best_match(raw_interpretation)
if pattern is None:
return InterpretationAssessment(
interpretation_id=interp_id,
raw_interpretation=raw_interpretation,
assessment=InterpretationVerdict.UNCERTAIN,
corrected_interpretation=(
"Diese Auslegung lässt sich ohne weitere Angaben nicht bewerten. Bitte Produkt, "
"Rolle, Marktzugang und die konkret betroffene Pflicht benennen."
),
explanation="Kein bekanntes Auslegungsmuster erkannt — bewusst keine Scheinsicherheit.",
confidence=Confidence.LOW,
)
return InterpretationAssessment(
interpretation_id=interp_id,
raw_interpretation=raw_interpretation,
affected_regulations=pattern.affected_regulations,
affected_obligations=pattern.affected_obligations,
assessment=pattern.verdict,
risks=pattern.risks,
corrected_interpretation=pattern.corrected_interpretation,
legal_basis_refs=pattern.legal_basis_refs,
explanation=pattern.explanation,
confidence=pattern.confidence,
)
@@ -0,0 +1,116 @@
"""Applicable-obligation engine (spec Modus 2).
Maps a product profile (optionally a precomputed scope) to the concrete legal
obligations, the overlaps between them, and which evidence types satisfy more
than one obligation at once (the core USP, spec §16).
"""
from __future__ import annotations
from typing import Dict, List, Optional
from .predicates import evaluate, true_leaves
from .rules_obligations import ALL_OBLIGATIONS
from .rules_overlaps import OVERLAP_GROUPS
from .rules_regulations import FIELD_LABELS
from .rules_types import ObligationRule
from .schemas import (
ApplicableObligation,
ObligationOverlap,
ObligationsResponse,
ProductProfile,
RegulatoryScope,
)
from .scope_engine import discover_scope
def _applicable_regulation_ids(profile: ProductProfile, scope: Optional[RegulatoryScope]) -> List[str]:
if scope is None:
scope = discover_scope(profile)
return [r.regulation_id for r in scope.applicable_regulations]
def _applies_because(rule: ObligationRule, profile: ProductProfile) -> List[str]:
labels: List[str] = []
for leaf in true_leaves(rule.applies_if, profile):
label = FIELD_LABELS.get(leaf[0])
if label and label not in labels:
labels.append(label)
if not labels:
labels.append("%s ist für dieses Produkt anwendbar." % rule.source_regulation)
return labels
def _role_ok(rule: ObligationRule, profile: ProductProfile) -> bool:
role = profile.manufacturer_role
if role is None:
return True # unknown role -> do not exclude
return role.value in rule.applies_to_role
def derive_obligations(
profile: ProductProfile, scope: Optional[RegulatoryScope] = None
) -> ObligationsResponse:
active_regs = set(_applicable_regulation_ids(profile, scope))
response = ObligationsResponse()
applied_ids: List[str] = []
for rule in ALL_OBLIGATIONS:
if rule.source_regulation not in active_regs:
continue
if rule.applies_unless is not None and evaluate(rule.applies_unless, profile) is True:
continue
verdict = evaluate(rule.applies_if, profile)
if verdict is not True or not _role_ok(rule, profile):
if verdict is False:
response.excluded_obligations.append(rule.obligation_id)
continue
applied_ids.append(rule.obligation_id)
response.applicable_obligations.append(
ApplicableObligation(
obligation_id=rule.obligation_id,
title=rule.title,
source_regulation=rule.source_regulation,
legal_basis_refs=rule.legal_basis_refs,
obligation_text=rule.obligation_text,
authority_level=rule.authority_level,
applies_because=_applies_because(rule, profile),
applies_to_role=rule.applies_to_role,
lifecycle_phase=rule.lifecycle_phase,
overlap_group_id=rule.overlap_group_id,
required_evidence=rule.required_evidence,
confidence=rule.base_confidence,
registry_anchor=rule.registry_anchor,
proposed=rule.proposed,
)
)
response.overlaps = _overlaps(applied_ids)
response.evidence_for_multiple = _evidence_for_multiple(response.applicable_obligations)
return response
def _overlaps(applied_ids: List[str]) -> List[ObligationOverlap]:
applied = set(applied_ids)
out: List[ObligationOverlap] = []
for group in OVERLAP_GROUPS:
present = [m for m in group.members if m in applied]
if len(present) >= 2:
out.append(
ObligationOverlap(
overlap_group_id=group.overlap_group_id,
obligations=present,
overlap_type=group.overlap_type,
canonical_obligation_id=group.canonical_obligation_id,
explanation=group.explanation,
)
)
return out
def _evidence_for_multiple(obligations: List[ApplicableObligation]) -> Dict[str, List[str]]:
by_evidence: Dict[str, List[str]] = {}
for ob in obligations:
for ev in ob.required_evidence:
by_evidence.setdefault(ev, []).append(ob.obligation_id)
return {ev: ids for ev, ids in by_evidence.items() if len(ids) > 1}
@@ -0,0 +1,100 @@
"""Safe, tri-state condition evaluator for applicability rules.
Conditions are plain data (no `eval`): a *leaf* is a 3-tuple
``(field, op, value)``; a *composite* is ``{"all": [...]}`` or
``{"any": [...]}``. Evaluation is tri-state — ``True`` / ``False`` /
``None`` (unknown) — so a missing product fact yields *uncertain*, never a
false negative.
"""
from __future__ import annotations
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple, Union
Leaf = Tuple[str, str, Any]
Condition = Union[Leaf, Dict[str, Any]]
def _attr(profile: Any, field: str) -> Any:
value = getattr(profile, field, None)
if isinstance(value, Enum):
return value.value
return value
def _eval_leaf(leaf: Leaf, profile: Any) -> Optional[bool]:
field, op, expected = leaf
actual = _attr(profile, field)
if op == "not_none":
return actual is not None
if op == "is_none":
return actual is None
if op == "contains_any":
# list-valued field (e.g. product_type); empty list = known-empty.
items = actual or []
hay = " ".join(str(x).lower() for x in items)
return any(str(k).lower() in hay for k in expected)
if actual is None:
return None # unknown fact -> unknown result
if op == "eq":
return bool(actual == expected)
if op == "ne":
return bool(actual != expected)
if op == "truthy":
return bool(actual)
if op == "falsy":
return not bool(actual)
if op == "in":
return bool(actual in expected)
if op == "not_in":
return bool(actual not in expected)
if op == "date_after":
return bool(actual > expected)
raise ValueError("unknown predicate op: %r" % (op,))
def evaluate(condition: Optional[Condition], profile: Any) -> Optional[bool]:
"""Return True/False/None(unknown) for a condition tree."""
if condition is None:
return True
if isinstance(condition, tuple):
return _eval_leaf(condition, profile)
if "all" in condition:
results = [evaluate(c, profile) for c in condition["all"]]
if any(r is False for r in results):
return False
if any(r is None for r in results):
return None
return True
if "any" in condition:
results = [evaluate(c, profile) for c in condition["any"]]
if any(r is True for r in results):
return True
if any(r is None for r in results):
return None
return False
raise ValueError("malformed condition: %r" % (condition,))
def true_leaves(condition: Optional[Condition], profile: Any) -> List[Leaf]:
"""Collect the leaf conditions that evaluated True (for trigger_facts)."""
if condition is None:
return []
if isinstance(condition, tuple):
return [condition] if _eval_leaf(condition, profile) is True else []
members = condition.get("all") or condition.get("any") or []
out: List[Leaf] = []
for c in members:
out.extend(true_leaves(c, profile))
return out
def unknown_fields(fields: List[str], profile: Any) -> List[str]:
"""Subset of `fields` whose value on the profile is None (unknown)."""
return [f for f in fields if _attr(profile, f) is None]
@@ -0,0 +1,23 @@
"""Aggregated obligation scope rules + lookup helpers."""
from __future__ import annotations
from typing import Dict, List, Optional
from .rules_obligations_cra import CRA_OBLIGATIONS
from .rules_obligations_machine_data import DATA_ACT_OBLIGATIONS, MACHINE_OBLIGATIONS
from .rules_types import ObligationRule
ALL_OBLIGATIONS: List[ObligationRule] = (
CRA_OBLIGATIONS + MACHINE_OBLIGATIONS + DATA_ACT_OBLIGATIONS
)
_BY_ID: Dict[str, ObligationRule] = {o.obligation_id: o for o in ALL_OBLIGATIONS}
def obligation_rule(obligation_id: str) -> Optional[ObligationRule]:
return _BY_ID.get(obligation_id)
def obligations_for_regulation(regulation_id: str) -> List[ObligationRule]:
return [o for o in ALL_OBLIGATIONS if o.source_regulation == regulation_id]
@@ -0,0 +1,271 @@
"""CRA obligation scope rules.
`obligation_id`s in the six CRA-P1 families (sbom/vuln/authentication/logging/
remote_access/updates) are RE-USED verbatim from the Legal-KG registry
(`obligations/obligation_join_keys.json`) — never re-minted (control_uuid trap,
memory `project_compliance_graph.md`). Cross-cutting CRA *process* obligations
(risk assessment, technical documentation, CE, instructions, secure-by-design
umbrella) are not yet in the registry and are flagged `proposed=True`.
"""
from __future__ import annotations
from typing import List
from .enums import AuthorityLevel, Confidence
from .rules_types import ObligationRule
_HAS_SW = ("has_software", "eq", True)
_EU = ("eu_market", "eq", True)
_REMOTE_OR_CLOUD = {"any": [("has_remote_access", "eq", True), ("has_cloud_connection", "eq", True)]}
_LM = AuthorityLevel.LEGAL_TEXT
CRA_OBLIGATIONS: List[ObligationRule] = [
ObligationRule(
obligation_id="sbom_creation",
title="Software Bill of Materials erstellen",
source_regulation="CRA",
obligation_text="Eine SBOM erstellen, die mindestens die obersten Abhängigkeiten des Produkts dokumentiert.",
legal_basis_refs=["CRA Annex I Part II (1)"],
authority_level=_LM,
family="sbom",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["software_bill_of_materials"],
required_evidence=["sbom", "repo_scan"],
lifecycle_phase=["development", "placing_on_market", "maintenance"],
registry_anchor=True,
),
ObligationRule(
obligation_id="provide_security_updates",
title="Sicherheitsupdates bereitstellen",
source_regulation="CRA",
obligation_text="Sicherheitsrelevante Updates zeitnah und über den Supportzeitraum bereitstellen.",
legal_basis_refs=["CRA Annex I (2)(c)", "CRA Art. 13"],
authority_level=_LM,
family="updates",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["secure_updates"],
required_evidence=["policy", "ticket", "test_report"],
lifecycle_phase=["maintenance", "update"],
overlap_group_id="SECURITY_UPDATES",
registry_anchor=True,
),
ObligationRule(
obligation_id="support_period_maintenance",
title="Supportzeitraum definieren und einhalten",
source_regulation="CRA",
obligation_text="Einen angemessenen Supportzeitraum festlegen, in dem Schwachstellen behandelt werden.",
legal_basis_refs=["CRA Art. 13(8)"],
authority_level=_LM,
family="updates",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["secure_updates"],
required_evidence=["policy"],
lifecycle_phase=["placing_on_market", "maintenance", "update"],
registry_anchor=True,
),
ObligationRule(
obligation_id="signed_update_integrity",
title="Integrität von Updates sicherstellen",
source_regulation="CRA",
obligation_text="Updates signieren und ihre Integrität bei der Verteilung verifizieren.",
legal_basis_refs=["CRA Annex I (1)(3)(f)"],
authority_level=_LM,
family="updates",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["software_integrity"],
required_evidence=["config_export", "test_report"],
lifecycle_phase=["development", "maintenance", "update"],
overlap_group_id="SECURITY_UPDATES",
registry_anchor=True,
),
ObligationRule(
obligation_id="vuln_handling_process",
title="Schwachstellenbehandlungs-Prozess",
source_regulation="CRA",
obligation_text="Einen dokumentierten Prozess zur Identifikation, Bewertung und Behebung von Schwachstellen betreiben.",
legal_basis_refs=["CRA Art. 13(8)", "CRA Annex VII"],
authority_level=_LM,
family="vuln",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["vulnerability_management"],
required_evidence=["policy", "ticket"],
lifecycle_phase=["development", "operation", "maintenance"],
overlap_group_id="VULNERABILITY_HANDLING",
registry_anchor=True,
),
ObligationRule(
obligation_id="coordinated_vulnerability_disclosure",
title="Coordinated Vulnerability Disclosure",
source_regulation="CRA",
obligation_text="Eine Richtlinie zur koordinierten Offenlegung von Schwachstellen bereitstellen.",
legal_basis_refs=["CRA Annex I Part II (5)"],
authority_level=_LM,
family="vuln",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["coordinated_disclosure"],
required_evidence=["policy"],
lifecycle_phase=["operation", "maintenance"],
overlap_group_id="VULNERABILITY_HANDLING",
registry_anchor=True,
),
ObligationRule(
obligation_id="exploited_vuln_reporting_authorities",
title="Meldung aktiv ausgenutzter Schwachstellen / Vorfälle",
source_regulation="CRA",
obligation_text="Aktiv ausgenutzte Schwachstellen und schwerwiegende Vorfälle an die zuständigen Behörden melden.",
legal_basis_refs=["CRA Art. 14", "CRA Art. 16"],
authority_level=_LM,
family="vuln",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["incident_reporting"],
required_evidence=["policy", "ticket"],
lifecycle_phase=["operation", "maintenance"],
registry_anchor=True,
),
ObligationRule(
obligation_id="user_authentication_required",
title="Authentifizierung vorsehen",
source_regulation="CRA",
obligation_text="Den Zugang über einen geeigneten Authentifizierungsmechanismus schützen.",
legal_basis_refs=["CRA Annex I (2)(d)"],
authority_level=_LM,
family="authentication",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["authentication"],
required_evidence=["config_export", "pentest"],
lifecycle_phase=["development", "operation"],
registry_anchor=True,
),
ObligationRule(
obligation_id="no_default_credentials",
title="Keine unveränderlichen Standard-Zugangsdaten",
source_regulation="CRA",
obligation_text="Sichere Standardkonfiguration; keine fest hinterlegten oder unveränderlichen Standard-Passwörter.",
legal_basis_refs=["CRA Annex I (2)(a)", "CRA Annex I (2)(b)"],
authority_level=_LM,
family="authentication",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["secure_by_default"],
required_evidence=["config_export", "test_report"],
lifecycle_phase=["development", "placing_on_market"],
registry_anchor=True,
),
ObligationRule(
obligation_id="event_logging_security_events",
title="Sicherheitsrelevante Ereignisse protokollieren",
source_regulation="CRA",
obligation_text="Sicherheitsrelevante Ereignisse und Zugriffe aufzeichnen, um Vorfälle nachvollziehen zu können.",
legal_basis_refs=["CRA Annex I Part I (2)(k)"],
authority_level=_LM,
family="logging",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["security_logging"],
required_evidence=["config_export", "audit_log"],
lifecycle_phase=["operation", "maintenance"],
registry_anchor=True,
),
ObligationRule(
obligation_id="remote_access_attack_surface_min",
title="Angriffsfläche minimieren",
source_regulation="CRA",
obligation_text="Die Angriffsfläche begrenzen, insbesondere exponierte Remote-/Cloud-Schnittstellen.",
legal_basis_refs=["CRA Annex I (1)(2)(a)"],
authority_level=_LM,
family="remote_access",
applies_if={"all": [_REMOTE_OR_CLOUD, _EU]},
required_capabilities=["secure_by_default"],
required_evidence=["config_export", "repo_scan", "pentest"],
lifecycle_phase=["development", "operation"],
registry_anchor=True,
),
ObligationRule(
obligation_id="remote_access_confidentiality_integrity",
title="Vertraulichkeit/Integrität der Fernverbindung",
source_regulation="CRA",
obligation_text="Daten bei Fernzugriff/Cloud-Anbindung verschlüsselt und integritätsgeschützt übertragen.",
legal_basis_refs=["CRA Annex I (1)(2)(b)", "CRA Annex I (1)(2)(c)"],
authority_level=_LM,
family="remote_access",
applies_if={"all": [_REMOTE_OR_CLOUD, _EU]},
required_capabilities=["secure_communication"],
required_evidence=["config_export", "pentest"],
lifecycle_phase=["operation"],
registry_anchor=True,
),
# --- Cross-cutting CRA process obligations (not yet in registry) ---------
ObligationRule(
obligation_id="cra_secure_by_design",
title="Security by Design",
source_regulation="CRA",
obligation_text="Das Produkt so entwerfen, entwickeln und herstellen, dass ein angemessenes Cybersicherheitsniveau gewährleistet ist.",
legal_basis_refs=["CRA Annex I Part I (1)"],
authority_level=_LM,
family="cra_process",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["secure_by_default", "risk_assessment"],
required_evidence=["policy", "test_report"],
lifecycle_phase=["development", "placing_on_market"],
proposed=True,
),
ObligationRule(
obligation_id="cra_risk_assessment",
title="Cybersicherheits-Risikobewertung",
source_regulation="CRA",
obligation_text="Eine Cybersicherheits-Risikobewertung durchführen und dokumentieren; in die technische Dokumentation aufnehmen.",
legal_basis_refs=["CRA Art. 13(2)", "CRA Annex I Part I (1)"],
authority_level=_LM,
family="cra_process",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["risk_assessment"],
required_evidence=["policy"],
lifecycle_phase=["development", "placing_on_market"],
overlap_group_id="RISK_ASSESSMENT",
proposed=True,
),
ObligationRule(
obligation_id="cra_technical_documentation",
title="Technische Dokumentation",
source_regulation="CRA",
obligation_text="Technische Dokumentation erstellen und aktuell halten, die Konformität mit den Anforderungen belegt.",
legal_basis_refs=["CRA Art. 31", "CRA Annex VII"],
authority_level=_LM,
family="cra_process",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["technical_documentation"],
required_evidence=["policy"],
lifecycle_phase=["placing_on_market", "maintenance"],
overlap_group_id="TECHNICAL_DOCUMENTATION",
proposed=True,
),
ObligationRule(
obligation_id="cra_ce_conformity_assessment",
title="Konformitätsbewertung / CE-Kennzeichnung",
source_regulation="CRA",
obligation_text="Vor dem Inverkehrbringen das passende Konformitätsbewertungsverfahren durchlaufen und CE kennzeichnen.",
legal_basis_refs=["CRA Art. 32", "CRA Art. 28"],
authority_level=_LM,
family="cra_process",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["conformity_assessment"],
required_evidence=["test_report", "policy"],
lifecycle_phase=["placing_on_market"],
overlap_group_id="CE_CONFORMITY",
proposed=True,
),
ObligationRule(
obligation_id="cra_instructions_for_use",
title="Informationen und Anweisungen für Nutzer",
source_regulation="CRA",
obligation_text="Nutzern verständliche Sicherheitsinformationen und -anweisungen bereitstellen (z. B. zu Updates und Support-Ende).",
legal_basis_refs=["CRA Annex II"],
authority_level=_LM,
family="cra_process",
applies_if={"all": [_HAS_SW, _EU]},
required_capabilities=["technical_documentation"],
required_evidence=["policy"],
lifecycle_phase=["placing_on_market"],
overlap_group_id="INSTRUCTIONS_FOR_USE",
proposed=True,
),
]
@@ -0,0 +1,139 @@
"""MaschinenVO and Data Act obligation scope rules.
These regulations are NOT yet in the Legal-KG registry (which currently covers
the six CRA-P1 families). Every obligation here is therefore `proposed=True`:
the reasoning layer proposes the snake_case id, the Obligation Registry session
remains the only authority that may canonicalise it (re-link, never re-mint).
"""
from __future__ import annotations
from typing import List
from .enums import AuthorityLevel, Confidence
from .rules_types import ObligationRule
_EU = ("eu_market", "eq", True)
_IS_MACHINE = ("is_machine", "eq", True)
_LM = AuthorityLevel.LEGAL_TEXT
MACHINE_OBLIGATIONS: List[ObligationRule] = [
ObligationRule(
obligation_id="machine_risk_assessment",
title="Maschinen-Risikobeurteilung",
source_regulation="MaschinenVO",
obligation_text="Eine Risikobeurteilung der Maschine durchführen, um Gefährdungen zu ermitteln und zu mindern.",
legal_basis_refs=["MaschinenVO (EU) 2023/1230 Anhang III (1.1.1)", "EN ISO 12100"],
authority_level=_LM,
family="machine_safety",
applies_if={"all": [_IS_MACHINE, _EU]},
required_capabilities=["risk_assessment"],
required_evidence=["policy"],
lifecycle_phase=["development", "placing_on_market"],
overlap_group_id="RISK_ASSESSMENT",
proposed=True,
),
ObligationRule(
obligation_id="machine_safety_control_systems",
title="Sichere Steuerungssysteme",
source_regulation="MaschinenVO",
obligation_text="Sicherheitsbezogene Teile der Steuerung so auslegen, dass Ausfälle nicht zu gefährlichen Zuständen führen.",
legal_basis_refs=["MaschinenVO (EU) 2023/1230 Anhang III (1.2.1)", "EN ISO 13849-1"],
authority_level=_LM,
family="machine_safety",
applies_if={"all": [_IS_MACHINE, ("has_safety_function", "eq", True), _EU]},
required_capabilities=["functional_safety"],
required_evidence=["test_report", "policy"],
lifecycle_phase=["development", "placing_on_market"],
proposed=True,
),
ObligationRule(
obligation_id="machine_protection_against_corruption",
title="Schutz gegen Korrumpierung sicherheitsrelevanter Funktionen",
source_regulation="MaschinenVO",
obligation_text="Sicherstellen, dass eine (auch beabsichtigte) Korrumpierung der Software/Verbindung keine gefährliche Situation auslöst.",
legal_basis_refs=["MaschinenVO (EU) 2023/1230 Anhang III (1.1.9)"],
authority_level=_LM,
family="machine_safety",
applies_if={
"all": [
_IS_MACHINE,
("has_safety_function", "eq", True),
{"any": [("has_remote_access", "eq", True), ("has_software", "eq", True)]},
_EU,
]
},
required_capabilities=["software_integrity", "secure_by_default"],
required_evidence=["test_report", "config_export"],
lifecycle_phase=["development", "operation", "maintenance"],
overlap_group_id="VULNERABILITY_HANDLING",
proposed=True,
),
ObligationRule(
obligation_id="machine_instructions_for_use",
title="Betriebsanleitung",
source_regulation="MaschinenVO",
obligation_text="Eine vollständige Betriebsanleitung mit Sicherheitshinweisen bereitstellen.",
legal_basis_refs=["MaschinenVO (EU) 2023/1230 Anhang III (1.7.4)"],
authority_level=_LM,
family="machine_safety",
applies_if={"all": [_IS_MACHINE, _EU]},
required_capabilities=["technical_documentation"],
required_evidence=["policy"],
lifecycle_phase=["placing_on_market"],
overlap_group_id="INSTRUCTIONS_FOR_USE",
proposed=True,
),
ObligationRule(
obligation_id="machine_ce_conformity",
title="Konformitätsbewertung / CE (Maschine)",
source_regulation="MaschinenVO",
obligation_text="Das passende Konformitätsbewertungsverfahren der MaschinenVO durchlaufen und CE kennzeichnen.",
legal_basis_refs=["MaschinenVO (EU) 2023/1230 Art. 25", "Anhang IV"],
authority_level=_LM,
family="machine_safety",
applies_if={"all": [_IS_MACHINE, _EU]},
required_capabilities=["conformity_assessment"],
required_evidence=["test_report", "policy"],
lifecycle_phase=["placing_on_market"],
overlap_group_id="CE_CONFORMITY",
proposed=True,
),
]
DATA_ACT_OBLIGATIONS: List[ObligationRule] = [
ObligationRule(
obligation_id="data_act_data_access_by_design",
title="Datenzugang by design",
source_regulation="DataAct",
obligation_text="Vernetzte Produkte so gestalten, dass die erzeugten Produktdaten standardmäßig zugänglich sind.",
legal_basis_refs=["Data Act (EU) 2023/2854 Art. 3"],
authority_level=_LM,
family="data_act",
applies_if={
"all": [
("generates_usage_data", "eq", True),
{"any": [("has_cloud_connection", "eq", True), ("has_remote_access", "eq", True)]},
_EU,
]
},
required_capabilities=["data_access_provision"],
required_evidence=["config_export", "policy"],
lifecycle_phase=["development", "placing_on_market"],
proposed=True,
),
ObligationRule(
obligation_id="data_act_user_data_access",
title="Datenzugang für Nutzer",
source_regulation="DataAct",
obligation_text="Nutzern Zugang zu den von ihnen erzeugten Daten gewähren und Weitergabe an Dritte ermöglichen.",
legal_basis_refs=["Data Act (EU) 2023/2854 Art. 4", "Art. 5"],
authority_level=_LM,
family="data_act",
applies_if={"all": [("generates_usage_data", "eq", True), _EU]},
required_capabilities=["data_access_provision"],
required_evidence=["policy"],
lifecycle_phase=["operation"],
proposed=True,
),
]
@@ -0,0 +1,91 @@
"""Obligation overlap groups (spec §4.5 / Modus 2).
Overlaps are emitted only for the members that are actually applicable to the
product. `canonical_obligation_id` points at the strongest / most specific
obligation in the group (preferring a registry-anchored CRA id).
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import List
from .enums import OverlapType
@dataclass(frozen=True)
class OverlapGroup:
overlap_group_id: str
members: List[str]
overlap_type: OverlapType
canonical_obligation_id: str
explanation: str
OVERLAP_GROUPS: List[OverlapGroup] = [
OverlapGroup(
overlap_group_id="VULNERABILITY_HANDLING",
members=[
"vuln_handling_process",
"coordinated_vulnerability_disclosure",
"machine_protection_against_corruption",
],
overlap_type=OverlapType.COMPLEMENTARY,
canonical_obligation_id="vuln_handling_process",
explanation=(
"CRA adressiert die Schwachstellenbehandlung des Produkts. Die MaschinenVO wird "
"komplementär relevant, sobald eine Cyber-Schwachstelle eine Sicherheitsfunktion "
"beeinflussen kann (Anhang III 1.1.9). Nicht identisch, aber gemeinsam zu erfüllen."
),
),
OverlapGroup(
overlap_group_id="SECURITY_UPDATES",
members=["provide_security_updates", "signed_update_integrity"],
overlap_type=OverlapType.COMPLEMENTARY,
canonical_obligation_id="provide_security_updates",
explanation=(
"Updates bereitstellen und ihre Integrität sichern sind zwei Seiten desselben "
"Update-Prozesses; ein Nachweis (Update-Policy, Release Notes) deckt teils beide ab."
),
),
OverlapGroup(
overlap_group_id="RISK_ASSESSMENT",
members=["cra_risk_assessment", "machine_risk_assessment"],
overlap_type=OverlapType.DIFFERENT_SCOPE,
canonical_obligation_id="cra_risk_assessment",
explanation=(
"Zwei getrennte Risikobetrachtungen: CRA = Cybersicherheits-Risiko, MaschinenVO = "
"Sicherheits-/Gefährdungsbeurteilung. Methodisch verwandt, inhaltlich unterschiedlich."
),
),
OverlapGroup(
overlap_group_id="TECHNICAL_DOCUMENTATION",
members=["cra_technical_documentation", "machine_risk_assessment"],
overlap_type=OverlapType.SIMILAR,
canonical_obligation_id="cra_technical_documentation",
explanation=(
"Beide Regime verlangen eine technische Dokumentation; Teile (Risikobetrachtung, "
"Konstruktionsunterlagen) lassen sich in einem konsolidierten technischen Dossier führen."
),
),
OverlapGroup(
overlap_group_id="CE_CONFORMITY",
members=["cra_ce_conformity_assessment", "machine_ce_conformity"],
overlap_type=OverlapType.COMPLEMENTARY,
canonical_obligation_id="machine_ce_conformity",
explanation=(
"Ein Produkt kann zwei CE-Regime gleichzeitig erfüllen müssen (MaschinenVO + CRA). "
"Eine gemeinsame CE-Kennzeichnung, aber getrennte Konformitätsbewertungen."
),
),
OverlapGroup(
overlap_group_id="INSTRUCTIONS_FOR_USE",
members=["cra_instructions_for_use", "machine_instructions_for_use"],
overlap_type=OverlapType.SIMILAR,
canonical_obligation_id="machine_instructions_for_use",
explanation=(
"Betriebsanleitung (MaschinenVO) und Sicherheitsinformationen (CRA) überschneiden sich; "
"ein integriertes Anleitungsdokument kann beide Pflichten bedienen."
),
),
]
@@ -0,0 +1,160 @@
"""Regulation-level applicability trigger rules (scope discovery, spec Modus 1).
Each rule is pure data consumed by `scope_engine`. Triggers reference
`ProductProfile` fields through the safe predicate evaluator. `required_facts`
that are unknown turn the verdict *uncertain* and surface `fact_prompts`.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from .enums import Confidence
from .predicates import Condition
# Positive, human-readable label per profile fact (for trigger_facts output).
FIELD_LABELS: Dict[str, str] = {
"has_software": "Produkt enthält Software / digitale Elemente",
"has_embedded_software": "Produkt enthält eingebettete Software",
"has_remote_access": "Produkt besitzt Fernzugriff / Fernwartung",
"has_cloud_connection": "Produkt ist mit einer Cloud verbunden",
"has_radio_module": "Produkt enthält ein Funkmodul",
"has_safety_function": "Produkt erfüllt eine Sicherheitsfunktion",
"generates_usage_data": "Vernetztes Produkt erzeugt nutzbare Produktdaten",
"is_machine": "Produkt ist eine Maschine",
"is_component": "Produkt ist ein (Sicherheits-)Bauteil",
"eu_market": "Produkt wird auf dem EU-Markt bereitgestellt",
"is_essential_or_important_entity": "Unternehmen ist wesentliche/wichtige Einrichtung",
"manufacturer_role": "Wirtschaftsakteur-Rolle (Hersteller/Importeur/Händler)",
}
@dataclass(frozen=True)
class RegulationRule:
regulation_id: str
name: str
trigger: Condition
required_facts: List[str]
fact_prompts: Dict[str, str]
legal_basis_refs: List[str]
summary: str
confidence_when_applicable: Confidence = Confidence.HIGH
exclusion: Optional[Condition] = None
# Status is downgraded to PARTIALLY_APPLICABLE / MEDIUM when the trigger
# fires only via inference rather than a directly stated fact.
inferred: bool = False
excludable_roles: List[str] = field(default_factory=list)
_ECONOMIC_ROLES = ["manufacturer", "importer", "distributor"]
REGULATION_RULES: List[RegulationRule] = [
RegulationRule(
regulation_id="CRA",
name="Cyber Resilience Act (EU) 2024/2847",
trigger={
"all": [
{"any": [("has_software", "eq", True), ("has_embedded_software", "eq", True)]},
("eu_market", "eq", True),
]
},
required_facts=["has_software", "eu_market", "manufacturer_role"],
fact_prompts={
"has_software": "Enthält das Produkt Software / digitale Elemente?",
"eu_market": "Wird das Produkt auf dem EU-Markt bereitgestellt oder in Verkehr gebracht?",
"manufacturer_role": "Welche Rolle nehmen Sie ein (Hersteller / Importeur / Händler)?",
},
legal_basis_refs=["CRA Art. 2(1)", "CRA Art. 3(1)"],
summary="Produkte mit digitalen Elementen, die auf dem EU-Markt bereitgestellt werden.",
confidence_when_applicable=Confidence.HIGH,
excludable_roles=["operator"],
),
RegulationRule(
regulation_id="MaschinenVO",
name="Maschinenverordnung (EU) 2023/1230",
trigger={
"any": [
("is_machine", "eq", True),
{"all": [("is_component", "eq", True), ("has_safety_function", "eq", True)]},
]
},
required_facts=["is_machine", "eu_market"],
fact_prompts={
"is_machine": "Ist das Produkt eine Maschine oder ein Sicherheitsbauteil?",
"has_safety_function": "Erfüllt das Bauteil eine Sicherheitsfunktion?",
},
legal_basis_refs=["MaschinenVO (EU) 2023/1230 Art. 2", "Anhang III"],
summary="Maschinen oder Sicherheitsbauteile, ggf. mit sicherheitsrelevanter Steuerung.",
confidence_when_applicable=Confidence.MEDIUM,
),
RegulationRule(
regulation_id="RED",
name="Radio Equipment Directive 2014/53/EU",
trigger=("has_radio_module", "eq", True),
required_facts=["has_radio_module"],
fact_prompts={
"has_radio_module": "Besitzt das Produkt ein Funkmodul (WLAN, Bluetooth, Mobilfunk)?",
},
legal_basis_refs=["RED 2014/53/EU Art. 1", "Art. 3(3)(d-f)"],
summary="Funkanlagen; Art. 3(3) deckt zusätzlich Cybersecurity-Anforderungen ab.",
confidence_when_applicable=Confidence.HIGH,
),
RegulationRule(
regulation_id="EMV",
name="EMV-Richtlinie 2014/30/EU",
trigger={
"any": [
("has_software", "eq", True),
("has_embedded_software", "eq", True),
("has_radio_module", "eq", True),
]
},
required_facts=[],
fact_prompts={
"is_electrical": "Ist das Produkt ein elektrisches / elektronisches Betriebsmittel?",
},
legal_basis_refs=["EMV-RL 2014/30/EU Art. 2"],
summary="Elektrische/elektronische Betriebsmittel (hier aus den digitalen Elementen abgeleitet).",
confidence_when_applicable=Confidence.MEDIUM,
inferred=True,
),
RegulationRule(
regulation_id="DataAct",
name="Data Act (EU) 2023/2854",
trigger={
"all": [
{"any": [("has_cloud_connection", "eq", True), ("has_remote_access", "eq", True)]},
("generates_usage_data", "eq", True),
]
},
required_facts=["generates_usage_data"],
fact_prompts={
"generates_usage_data": "Erzeugt das vernetzte Produkt nutzbare Produkt-/Nutzungsdaten?",
},
legal_basis_refs=["Data Act (EU) 2023/2854 Art. 2(5)", "Art. 3-5"],
summary="Vernetzte Produkte, die Nutzungsdaten erzeugen und zugänglich machen.",
confidence_when_applicable=Confidence.HIGH,
),
RegulationRule(
regulation_id="NIS2",
name="NIS2-Richtlinie (EU) 2022/2555",
trigger=("is_essential_or_important_entity", "eq", True),
required_facts=["company_size", "sector", "is_essential_or_important_entity"],
fact_prompts={
"company_size": "Unternehmensgröße (Mitarbeiterzahl / Umsatz)?",
"sector": "In welchem Sektor ist das Unternehmen tätig (Anhang I/II)?",
"is_essential_or_important_entity": "Fällt das Unternehmen als wesentliche/wichtige Einrichtung unter NIS2?",
},
legal_basis_refs=["NIS2-RL (EU) 2022/2555 Art. 2", "Art. 3"],
summary="Adressiert die ORGANISATION (Größe/Sektor/Rolle), nicht das Produkt.",
confidence_when_applicable=Confidence.MEDIUM,
),
]
def regulation_rule(regulation_id: str) -> Optional[RegulationRule]:
for rule in REGULATION_RULES:
if rule.regulation_id == regulation_id:
return rule
return None
@@ -0,0 +1,58 @@
"""Shared types for obligation scope rules.
`required_evidence` MUST draw from the framework-AGNOSTIC evidence catalog
owned by the Compliance Execution Graph (memory `project_compliance_graph.md`,
User-Direktive 2026-06-25). Do not invent framework-specific evidence types.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import List, Optional
from .enums import AuthorityLevel, Confidence
from .predicates import Condition
# Framework-agnostic shared evidence catalog (the only allowed tokens).
EVIDENCE_CATALOG = frozenset(
{
"config_export",
"test_report",
"repo_scan",
"sbom",
"policy",
"audit_log",
"pentest",
"ticket",
}
)
@dataclass(frozen=True)
class ObligationRule:
obligation_id: str
title: str
source_regulation: str
obligation_text: str
legal_basis_refs: List[str]
authority_level: AuthorityLevel
family: str
applies_if: Condition
required_capabilities: List[str]
required_evidence: List[str]
base_confidence: Confidence = Confidence.HIGH
applies_unless: Optional[Condition] = None
lifecycle_phase: List[str] = field(default_factory=list)
applies_to_role: List[str] = field(default_factory=lambda: ["manufacturer", "importer"])
overlap_group_id: Optional[str] = None
# True => obligation_id is owned by the Legal-KG registry (re-link, never re-mint).
registry_anchor: bool = False
# True => Machine/Data-Act obligation the registry has not canonicalised yet.
proposed: bool = False
def __post_init__(self) -> None:
bad = [e for e in self.required_evidence if e not in EVIDENCE_CATALOG]
if bad:
raise ValueError(
"obligation %s uses non-catalog evidence %r" % (self.obligation_id, bad)
)
@@ -0,0 +1,216 @@
"""Pydantic domain objects for the Regulatory Reasoning Engine.
Trigger facts that drive scope are tri-state (`Optional[bool] = None`): `None`
means "fact unknown" and produces an *uncertain* verdict plus a concrete
missing-fact prompt — never silent false security (spec §6.3).
"""
from __future__ import annotations
from datetime import date
from typing import Dict, List, Optional
from pydantic import BaseModel, Field
from .enums import (
ApplicabilityStatus,
AuthorityLevel,
Confidence,
CoverageStatus,
InterpretationVerdict,
ManufacturerRole,
MarketModel,
OverlapType,
ProductLifecyclePhase,
)
# ---------------------------------------------------------------------------
# Input
# ---------------------------------------------------------------------------
class ProductProfile(BaseModel):
"""The customer's product / system. Tri-state booleans => unknown facts."""
product_name: str
product_profile_id: Optional[str] = None
manufacturer_role: Optional[ManufacturerRole] = None
product_type: List[str] = Field(default_factory=list)
has_software: Optional[bool] = None
has_embedded_software: Optional[bool] = None
has_remote_access: Optional[bool] = None
has_cloud_connection: Optional[bool] = None
has_ai_functionality: Optional[bool] = None
has_radio_module: Optional[bool] = None
has_safety_function: Optional[bool] = None
generates_usage_data: Optional[bool] = None
is_machine: Optional[bool] = None
is_component: Optional[bool] = None
is_spare_part: Optional[bool] = None
placed_on_market_after: Optional[date] = None
intended_use: Optional[str] = None
eu_market: Optional[bool] = None
b2b_or_b2c: Optional[MarketModel] = None
lifecycle_phase: Optional[ProductLifecyclePhase] = None
# Organisation context — only needed for NIS2 (not a product fact).
company_size: Optional[str] = None
sector: Optional[str] = None
is_essential_or_important_entity: Optional[bool] = None
# ---------------------------------------------------------------------------
# Scope
# ---------------------------------------------------------------------------
class ApplicableRegulation(BaseModel):
regulation_id: str
name: str
applicability_status: ApplicabilityStatus
trigger_facts: List[str] = Field(default_factory=list)
legal_basis_refs: List[str] = Field(default_factory=list)
confidence: Confidence
explanation: str
class ExcludedRegulation(BaseModel):
regulation_id: str
name: str
reason: str
class UncertainRegulation(BaseModel):
regulation_id: str
name: str
missing_facts: List[str] = Field(default_factory=list)
explanation: str
class RegulatoryScope(BaseModel):
product_profile_id: Optional[str] = None
applicable_regulations: List[ApplicableRegulation] = Field(default_factory=list)
excluded_regulations: List[ExcludedRegulation] = Field(default_factory=list)
uncertain_regulations: List[UncertainRegulation] = Field(default_factory=list)
missing_facts: List[str] = Field(default_factory=list)
confidence: Confidence = Confidence.MEDIUM
reasoning_summary: str = ""
# ---------------------------------------------------------------------------
# Obligations
# ---------------------------------------------------------------------------
class ApplicableObligation(BaseModel):
obligation_id: str
title: str
source_regulation: str
legal_basis_refs: List[str] = Field(default_factory=list)
obligation_text: str
authority_level: AuthorityLevel
applies_because: List[str] = Field(default_factory=list)
applies_to_role: List[str] = Field(default_factory=list)
lifecycle_phase: List[str] = Field(default_factory=list)
overlap_group_id: Optional[str] = None
required_evidence: List[str] = Field(default_factory=list)
confidence: Confidence
# True only when obligation_id is owned by the Legal-KG registry (CRA P1).
registry_anchor: bool = False
# Machine/Data-Act obligations the registry has not canonicalised yet.
proposed: bool = False
class ObligationOverlap(BaseModel):
overlap_group_id: str
obligations: List[str] = Field(default_factory=list)
overlap_type: OverlapType
canonical_obligation_id: str
explanation: str
# ---------------------------------------------------------------------------
# Customer claims & assessments
# ---------------------------------------------------------------------------
class CustomerImplementationClaim(BaseModel):
claim_id: str
raw_statement: str
normalized_claim: str = ""
claimed_capability: List[str] = Field(default_factory=list)
related_topics: List[str] = Field(default_factory=list)
qualifiers: List[str] = Field(default_factory=list)
evidence_refs: List[str] = Field(default_factory=list)
class ImplementationAssessment(BaseModel):
claim_id: str
obligation_id: str
coverage_status: CoverageStatus
missing_elements: List[str] = Field(default_factory=list)
required_evidence: List[str] = Field(default_factory=list)
explanation: str
confidence: Confidence
class InterpretationAssessment(BaseModel):
interpretation_id: str
raw_interpretation: str
affected_regulations: List[str] = Field(default_factory=list)
affected_obligations: List[str] = Field(default_factory=list)
assessment: InterpretationVerdict
risks: List[str] = Field(default_factory=list)
corrected_interpretation: str = ""
legal_basis_refs: List[str] = Field(default_factory=list)
explanation: str
confidence: Confidence
# ---------------------------------------------------------------------------
# API request / response envelopes
# ---------------------------------------------------------------------------
class ScopeRequest(BaseModel):
product_profile: ProductProfile
class ScopeResponse(BaseModel):
regulatory_scope: RegulatoryScope
missing_facts: List[str] = Field(default_factory=list)
confidence: Confidence
class ObligationsRequest(BaseModel):
product_profile: ProductProfile
regulatory_scope: Optional[RegulatoryScope] = None
class ObligationsResponse(BaseModel):
applicable_obligations: List[ApplicableObligation] = Field(default_factory=list)
overlaps: List[ObligationOverlap] = Field(default_factory=list)
excluded_obligations: List[str] = Field(default_factory=list)
evidence_for_multiple: Dict[str, List[str]] = Field(default_factory=dict)
class ImplementationRequest(BaseModel):
product_profile: ProductProfile
customer_claim: str
class ImplementationResponse(BaseModel):
claim: CustomerImplementationClaim
assessments: List[ImplementationAssessment] = Field(default_factory=list)
missing_evidence: List[str] = Field(default_factory=list)
summary: str = ""
class InterpretationRequest(BaseModel):
product_profile: Optional[ProductProfile] = None
customer_interpretation: str
class InterpretationResponse(BaseModel):
assessment: InterpretationVerdict
affected_regulations: List[str] = Field(default_factory=list)
affected_obligations: List[str] = Field(default_factory=list)
corrected_interpretation: str = ""
risks: List[str] = Field(default_factory=list)
legal_basis_refs: List[str] = Field(default_factory=list)
explanation: str = ""
confidence: Confidence = Confidence.MEDIUM
@@ -0,0 +1,136 @@
"""Scope discovery engine (spec Modus 1).
Answers "which regulations apply to my product?" — and, crucially, never says
"X applies" without the triggers, and never hides a missing fact behind a false
verdict. Pure rule evaluation, deterministic.
"""
from __future__ import annotations
from typing import List, Optional
from .enums import ApplicabilityStatus, Confidence
from .predicates import Condition, evaluate, true_leaves, unknown_fields
from .rules_regulations import REGULATION_RULES, FIELD_LABELS, RegulationRule
from .schemas import (
ApplicableRegulation,
ExcludedRegulation,
ProductProfile,
RegulatoryScope,
UncertainRegulation,
)
_DOWNGRADE = {Confidence.HIGH: Confidence.MEDIUM, Confidence.MEDIUM: Confidence.LOW, Confidence.LOW: Confidence.LOW}
def _fields_in(condition: Optional[Condition]) -> List[str]:
if condition is None:
return []
if isinstance(condition, tuple):
return [condition[0]]
out: List[str] = []
for c in condition.get("all") or condition.get("any") or []:
out.extend(_fields_in(c))
return out
def _trigger_facts(rule: RegulationRule, profile: ProductProfile) -> List[str]:
labels: List[str] = []
for leaf in true_leaves(rule.trigger, profile):
label = FIELD_LABELS.get(leaf[0])
if label and label not in labels:
labels.append(label)
return labels
def _missing_prompts(rule: RegulationRule, profile: ProductProfile) -> List[str]:
fields = list(dict.fromkeys(rule.required_facts + _fields_in(rule.trigger)))
unknown = unknown_fields(fields, profile)
prompts: List[str] = []
for f in unknown:
prompt = rule.fact_prompts.get(f)
if prompt and prompt not in prompts:
prompts.append(prompt)
return prompts
def discover_scope(profile: ProductProfile) -> RegulatoryScope:
scope = RegulatoryScope(product_profile_id=profile.product_profile_id)
for rule in REGULATION_RULES:
role_value = profile.manufacturer_role.value if profile.manufacturer_role is not None else None
role_excluded = role_value is not None and role_value in rule.excludable_roles
trig = evaluate(rule.trigger, profile)
missing = _missing_prompts(rule, profile)
if role_excluded:
scope.excluded_regulations.append(
ExcludedRegulation(
regulation_id=rule.regulation_id,
name=rule.name,
reason="Rolle '%s' ist von dieser Regulierung nicht unmittelbar adressiert." % role_value,
)
)
continue
if trig is True:
conf = Confidence.MEDIUM if rule.inferred else rule.confidence_when_applicable
status = (
ApplicabilityStatus.PARTIALLY_APPLICABLE if rule.inferred else ApplicabilityStatus.APPLICABLE
)
unresolved = unknown_fields(rule.required_facts, profile)
if unresolved:
conf = _DOWNGRADE[conf]
for f in unresolved:
prompt = rule.fact_prompts.get(f)
if prompt and prompt not in scope.missing_facts:
scope.missing_facts.append(prompt)
scope.applicable_regulations.append(
ApplicableRegulation(
regulation_id=rule.regulation_id,
name=rule.name,
applicability_status=status,
trigger_facts=_trigger_facts(rule, profile),
legal_basis_refs=rule.legal_basis_refs,
confidence=conf,
explanation=rule.summary,
)
)
elif trig is None:
scope.uncertain_regulations.append(
UncertainRegulation(
regulation_id=rule.regulation_id,
name=rule.name,
missing_facts=missing,
explanation=rule.summary,
)
)
for m in missing:
if m not in scope.missing_facts:
scope.missing_facts.append(m)
else: # trig is False -> definitively excluded by a known fact
scope.excluded_regulations.append(
ExcludedRegulation(
regulation_id=rule.regulation_id,
name=rule.name,
reason="Auslösende Voraussetzungen sind anhand der bekannten Fakten nicht erfüllt.",
)
)
scope.confidence = _overall_confidence(scope)
scope.reasoning_summary = _summary(scope)
return scope
def _overall_confidence(scope: RegulatoryScope) -> Confidence:
if scope.applicable_regulations and not scope.uncertain_regulations and not scope.missing_facts:
return Confidence.HIGH
if scope.applicable_regulations:
return Confidence.MEDIUM
return Confidence.LOW
def _summary(scope: RegulatoryScope) -> str:
applicable = ", ".join(r.regulation_id for r in scope.applicable_regulations) or ""
uncertain = ", ".join(r.regulation_id for r in scope.uncertain_regulations) or ""
return "Wahrscheinlich anwendbar: %s. Unsicher (fehlende Fakten): %s." % (applicable, uncertain)
@@ -0,0 +1,104 @@
"""Deterministic taxonomy for normalising free-text customer claims.
Capability names echo the planned Obligation -> Capability layer of the
Compliance Execution Graph (memory `project_compliance_graph.md`), so the
reasoning layer's claim capabilities line up with the registry's capabilities.
Matching is lowercase substring matching — deterministic, no LLM, no RAG.
"""
from __future__ import annotations
from typing import Dict, List
# capability -> trigger substrings (German + English), matched lowercase.
CAPABILITY_KEYWORDS: Dict[str, List[str]] = {
"software_bill_of_materials": [
"sbom", "stückliste", "stueckliste", "bill of materials", "komponentenliste",
],
"secure_updates": ["update", "patch", "aktualisier", "release", "rollout"],
"software_integrity": ["signier", "signatur", "signed", "integrität", "integritaet", "hash"],
"vulnerability_management": [
"schwachstelle", "vulnerab", "cve", "schwachstellenmanagement", "vuln",
],
"coordinated_disclosure": [
"disclosure", "offenlegung", "security.txt", "responsible disclosure",
],
"incident_reporting": [
"incident", "vorfall", "behörde", "behoerde", "csirt", "meldepflicht", "an die behörde",
],
"authentication": [
"authentifizier", "login", "passwort", "password", "mfa", "2fa", "anmeldung",
],
"secure_by_default": [
"härtung", "haertung", "hardening", "default", "standardkonfig",
"sichere konfiguration", "angriffsfläche", "angriffsflaeche",
],
"security_logging": ["logging", "log ", "logs", "protokoll", "audit-trail", "ereignisprotokoll"],
"secure_communication": ["verschlüssel", "verschluessel", "encryption", "tls", "vpn", "ssl"],
"risk_assessment": [
"risikoanalyse", "risikobeurteil", "risk assessment", "gefährdungsbeurteil",
"gefaehrdungsbeurteil", "bedrohungsanalyse", "threat model",
],
"technical_documentation": [
"dokumentation", "technische unterlagen", "betriebsanleitung", "handbuch", "documentation",
],
"conformity_assessment": ["konformität", "konformitaet", "conformity", "baumuster", "ce-kenn"],
"functional_safety": [
"performance level", "sil ", "iso 13849", "funktionale sicherheit", "safety control",
],
"data_access_provision": [
"datenzugang", "data access", "datenportabilität", "datenexport", "data export",
],
}
# capability -> broader compliance topics it touches (spec related_topics).
CAPABILITY_TOPICS: Dict[str, List[str]] = {
"software_bill_of_materials": ["component_transparency", "supply_chain", "vulnerability_management"],
"secure_updates": ["secure_updates", "vulnerability_remediation", "release_management"],
"software_integrity": ["secure_updates", "supply_chain", "tamper_protection"],
"vulnerability_management": ["vulnerability_handling", "monitoring", "patch_management"],
"coordinated_disclosure": ["vulnerability_handling", "transparency"],
"incident_reporting": ["incident_handling", "authority_notification"],
"authentication": ["access_control", "identity"],
"secure_by_default": ["hardening", "attack_surface", "configuration"],
"security_logging": ["monitoring", "forensics", "incident_handling"],
"secure_communication": ["confidentiality", "integrity", "remote_access"],
"risk_assessment": ["risk_management", "secure_by_design"],
"technical_documentation": ["documentation", "conformity"],
"conformity_assessment": ["conformity", "ce_marking"],
"functional_safety": ["machine_safety", "control_systems"],
"data_access_provision": ["data_sharing", "portability"],
}
# qualifier -> substrings that signal a weak/incomplete implementation.
QUALIFIER_KEYWORDS: Dict[str, List[str]] = {
"reactive": [
"wenn kunden", "wenn ein kunde", "nach meldung", "auf anfrage", "auf nachfrage",
"nur wenn", "reaktiv", "wenn fehler", "when customers", "on request", "when reported",
"ad hoc", "ad-hoc", "bei bedarf",
],
"manual": ["manuell", "von hand", "manual", "händisch", "haendisch"],
"planned": [
"geplant", "in planung", "wollen wir", "planen wir", "noch nicht", "zukünftig", "künftig",
],
"absent": ["haben wir nicht", "gibt es nicht", "nicht vorhanden", "keinen prozess", "keine"],
}
def match_capabilities(text: str) -> List[str]:
low = text.lower()
return [cap for cap, kws in CAPABILITY_KEYWORDS.items() if any(k in low for k in kws)]
def match_qualifiers(text: str) -> List[str]:
low = text.lower()
return [q for q, kws in QUALIFIER_KEYWORDS.items() if any(k in low for k in kws)]
def topics_for(capabilities: List[str]) -> List[str]:
out: List[str] = []
for cap in capabilities:
for t in CAPABILITY_TOPICS.get(cap, []):
if t not in out:
out.append(t)
return out
@@ -0,0 +1,159 @@
"""Known customer interpretation patterns (spec Modus 4).
Deterministic: a customer interpretation is matched by lowercase substring
triggers against a curated library of common misconceptions. No match ->
the engine returns `uncertain` and asks for the missing context (no false
security, spec §6.3).
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import List
from .enums import Confidence, InterpretationVerdict
@dataclass(frozen=True)
class InterpretationPattern:
pattern_id: str
triggers: List[str]
verdict: InterpretationVerdict
corrected_interpretation: str
explanation: str
affected_regulations: List[str] = field(default_factory=list)
affected_obligations: List[str] = field(default_factory=list)
risks: List[str] = field(default_factory=list)
legal_basis_refs: List[str] = field(default_factory=list)
confidence: Confidence = Confidence.MEDIUM
INTERPRETATION_PATTERNS: List[InterpretationPattern] = [
InterpretationPattern(
pattern_id="cra_only_new_products",
triggers=[
"nur für neue", "nur fuer neue", "nur neu entwickelt", "nur neuentwicklung",
"nur bei neuentwicklung", "only new product", "gilt nur für neue produkte",
],
verdict=InterpretationVerdict.TOO_NARROW,
corrected_interpretation=(
"CRA-Pflichten knüpfen primär an Produkt, Rolle, Marktzugang, Bereitstellung und "
"Übergangsfristen an, nicht nur an Neuentwicklung. Ein fertig entwickeltes "
"Katalogprodukt kann betroffen sein, wenn es nach dem maßgeblichen Zeitpunkt weiter "
"auf dem EU-Markt bereitgestellt wird."
),
explanation=(
"Die relevante Frage ist nicht nur, ob das Produkt neu entwickelt wurde, sondern ob es "
"nach dem Anwendungszeitpunkt weiterhin bereitgestellt oder in Verkehr gebracht wird."
),
affected_regulations=["CRA"],
risks=["Katalog-/Bestandsprodukt fällt trotz abgeschlossener Entwicklung unter den CRA."],
legal_basis_refs=["CRA Art. 2", "CRA Art. 69 (Übergangsbestimmungen)"],
confidence=Confidence.HIGH,
),
InterpretationPattern(
pattern_id="cra_b2b_exempt",
triggers=[
"gilt nicht für b2b", "nur für verbraucher", "nur b2c", "nicht im b2b",
"only consumer", "b2b ist ausgenommen",
],
verdict=InterpretationVerdict.TOO_NARROW,
corrected_interpretation=(
"Der CRA gilt produkt- und marktbezogen, unabhängig von B2B oder B2C. Eine generelle "
"B2B-Ausnahme existiert nicht; Industrieprodukte mit digitalen Elementen sind erfasst."
),
explanation="Der Anwendungsbereich knüpft an 'Produkte mit digitalen Elementen' an, nicht an die Kundengruppe.",
affected_regulations=["CRA"],
risks=["Industrielle B2B-Steuerungen werden fälschlich als ausgenommen behandelt."],
legal_basis_refs=["CRA Art. 2", "CRA Art. 3(1)"],
confidence=Confidence.HIGH,
),
InterpretationPattern(
pattern_id="sbom_is_enough",
triggers=[
"sbom reicht", "mit sbom sind wir", "sbom genügt", "sbom genuegt", "nur eine sbom",
"sbom allein",
],
verdict=InterpretationVerdict.TOO_NARROW,
corrected_interpretation=(
"Eine SBOM erfüllt nur einen Teil der Komponenten-Transparenz. Schwachstellen-"
"überwachung, Update-/Patch-Prozess und technische Dokumentation bleiben eigenständige Pflichten."
),
explanation="SBOM ist Voraussetzung, ersetzt aber nicht Vulnerability-Handling und Updates.",
affected_regulations=["CRA"],
affected_obligations=["sbom_creation", "vuln_handling_process", "provide_security_updates"],
risks=["Falsche Annahme vollständiger Erfüllung trotz fehlendem Vulnerability-Prozess."],
legal_basis_refs=["CRA Annex I Part II (1)", "CRA Annex I Part II (2)"],
confidence=Confidence.HIGH,
),
InterpretationPattern(
pattern_id="open_source_exempt",
triggers=[
"open source ist ausgenommen", "open-source ist ausgenommen", "oss ist ausgenommen",
"freie software ist ausgenommen", "open source fällt nicht",
],
verdict=InterpretationVerdict.PARTIALLY_CORRECT,
corrected_interpretation=(
"Nur nicht-kommerziell bereitgestellte Open-Source-Software ist ausgenommen. Sobald OSS "
"kommerziell in ein Produkt integriert und auf dem Markt bereitgestellt wird, greift der CRA."
),
explanation="Die Ausnahme zielt auf nicht-kommerzielle OSS-Bereitstellung, nicht auf kommerzielle Produktintegration.",
affected_regulations=["CRA"],
risks=["Kommerziell integrierte OSS-Komponenten werden fälschlich als ausgenommen behandelt."],
legal_basis_refs=["CRA Art. 2", "CRA Erwägungsgründe (Open-Source-Stewards)"],
confidence=Confidence.MEDIUM,
),
InterpretationPattern(
pattern_id="reactive_updates_ok",
triggers=[
"updates nur wenn", "reaktive updates reichen", "wenn kunden melden reicht",
"updates wenn fehler gemeldet",
],
verdict=InterpretationVerdict.TOO_NARROW,
corrected_interpretation=(
"Der CRA verlangt aktive Schwachstellenüberwachung und zeitnahe Sicherheitsupdates über "
"den Supportzeitraum, nicht nur reaktive Updates nach Kundenmeldung."
),
explanation="Ein rein reaktiver Updateprozess erfüllt die Pflicht zur aktiven Schwachstellenbehandlung nicht.",
affected_regulations=["CRA"],
affected_obligations=["provide_security_updates", "vuln_handling_process"],
risks=["Verzögerte Reaktion auf öffentlich bekannte Schwachstellen; Pflichtverletzung."],
legal_basis_refs=["CRA Annex I Part II (1)", "CRA Annex I (2)(c)"],
confidence=Confidence.HIGH,
),
InterpretationPattern(
pattern_id="machinery_covers_cyber",
triggers=[
"maschinenrichtlinie deckt cyber", "maschinenvo deckt alles", "ce der maschine reicht",
"ce maschine reicht für cyber", "maschinen-ce reicht",
],
verdict=InterpretationVerdict.PARTIALLY_CORRECT,
corrected_interpretation=(
"Die MaschinenVO deckt die sicherheitsrelevante Korrumpierung ab (Anhang III 1.1.9), "
"ersetzt aber nicht die produktbezogenen CRA-Security-Pflichten. Beide Regime gelten parallel."
),
explanation="Maschinen-CE und CRA überschneiden sich nur dort, wo Cyber eine Sicherheitsfunktion betrifft.",
affected_regulations=["CRA", "MaschinenVO"],
affected_obligations=["machine_protection_against_corruption", "vuln_handling_process"],
risks=["CRA-Pflichten werden übersehen, weil die Maschine bereits CE-gekennzeichnet ist."],
legal_basis_refs=["MaschinenVO Anhang III (1.1.9)", "CRA Art. 13"],
confidence=Confidence.MEDIUM,
),
InterpretationPattern(
pattern_id="no_radio_no_cyber",
triggers=[
"ohne funkmodul kein cyber", "kein funk also kein cra", "ohne funk keine security",
"ohne funkmodul keine cyber",
],
verdict=InterpretationVerdict.TOO_NARROW,
corrected_interpretation=(
"Der CRA knüpft an digitale Elemente an, nicht an ein Funkmodul. Ohne Funk entfällt die "
"RED, der CRA bleibt jedoch anwendbar, sobald Software vorhanden ist."
),
explanation="Funkmodul ist nur für die RED relevant; die CRA-Anwendbarkeit folgt aus der Software.",
affected_regulations=["CRA", "RED"],
risks=["CRA wird fälschlich verneint, weil kein Funkmodul vorhanden ist."],
legal_basis_refs=["CRA Art. 3(1)", "RED 2014/53/EU Art. 1"],
confidence=Confidence.HIGH,
),
]
@@ -0,0 +1,264 @@
"""Tests for the Regulatory Reasoning Engine.
Covers the five typical machine-builder scenarios and the ten acceptance
questions from the build spec (§15). Engine tests are pure (no DB); the
endpoint smoke tests mount only the reasoning router.
"""
from __future__ import annotations
from datetime import date
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from compliance.reasoning import (
assess_implementation,
assess_interpretation,
derive_obligations,
discover_scope,
normalize_claim,
)
from compliance.reasoning.enums import (
ApplicabilityStatus,
CoverageStatus,
InterpretationVerdict,
)
from compliance.reasoning.schemas import ProductProfile
from compliance.reasoning.enums import ManufacturerRole
# ---------------------------------------------------------------------------
# Fixtures / builders
# ---------------------------------------------------------------------------
def sps_profile(**overrides) -> ProductProfile:
base = dict(
product_name="SPS mit HMI",
product_type=["SPS", "HMI", "Schaltschrank"],
has_software=True,
has_remote_access=True,
has_cloud_connection=True,
eu_market=True,
manufacturer_role=ManufacturerRole.MANUFACTURER,
)
base.update(overrides)
return ProductProfile(**base)
def _reg_ids(scope, attr):
return [getattr(r, "regulation_id") for r in getattr(scope, attr)]
# ---------------------------------------------------------------------------
# 1. Gilt CRA für eine SPS mit Fernwartung?
# ---------------------------------------------------------------------------
def test_cra_applies_to_sps_with_remote_access():
scope = discover_scope(sps_profile())
cra = [r for r in scope.applicable_regulations if r.regulation_id == "CRA"]
assert cra and cra[0].applicability_status == ApplicabilityStatus.APPLICABLE
assert cra[0].confidence.value == "high"
assert any("digitale Elemente" in f or "Fernzugriff" in f for f in cra[0].trigger_facts) or cra[0].trigger_facts
# ---------------------------------------------------------------------------
# 2. Katalogprodukt 2027 weiter verkauft -> CRA gilt; "nur neue Produkte" zu eng
# ---------------------------------------------------------------------------
def test_cra_applies_to_finished_catalog_product():
profile = sps_profile(placed_on_market_after=date(2027, 1, 1), lifecycle_phase="placing_on_market")
scope = discover_scope(profile)
assert "CRA" in _reg_ids(scope, "applicable_regulations")
def test_interpretation_only_new_products_is_too_narrow():
result = assess_interpretation("Wir glauben, der CRA gilt nur für neue Produkte.")
assert result.assessment == InterpretationVerdict.TOO_NARROW
assert "CRA" in result.affected_regulations
assert result.corrected_interpretation
assert result.legal_basis_refs
# ---------------------------------------------------------------------------
# 3. Reicht eine SBOM allein? -> nein, nur teilweise
# ---------------------------------------------------------------------------
def test_sbom_alone_is_not_enough():
resp = assess_implementation(sps_profile(), "Wir haben SBOMs.")
sbom = [a for a in resp.assessments if a.obligation_id == "sbom_creation"]
assert sbom and sbom[0].coverage_status == CoverageStatus.COVERED
# but other obligations are surfaced as gaps -> aggregate not fully covered
assert any(a.coverage_status != CoverageStatus.COVERED for a in resp.assessments)
assert "Teilweise erfüllt" in resp.summary or "offen" in resp.summary
# ---------------------------------------------------------------------------
# 4. Ist ein reaktiver Updateprozess ausreichend? -> nur teilweise
# ---------------------------------------------------------------------------
def test_reactive_update_process_is_partial():
resp = assess_implementation(
sps_profile(), "Wir machen Updates, wenn Kunden Fehler melden."
)
upd = [a for a in resp.assessments if a.obligation_id == "provide_security_updates"]
assert upd and upd[0].coverage_status == CoverageStatus.PARTIALLY_COVERED
assert "reactive" in resp.claim.qualifiers
assert any("Schwachstellenüberwachung" in m for m in upd[0].missing_elements)
# ---------------------------------------------------------------------------
# 5. Wann überschneiden sich CRA und MaschinenVO?
# ---------------------------------------------------------------------------
def test_cra_and_machinery_overlap_on_cyber_safety():
profile = sps_profile(is_machine=True, has_safety_function=True)
resp = derive_obligations(profile)
ids = [o.obligation_id for o in resp.applicable_obligations]
assert "machine_protection_against_corruption" in ids
assert "vuln_handling_process" in ids
vuln_overlap = [o for o in resp.overlaps if o.overlap_group_id == "VULNERABILITY_HANDLING"]
assert vuln_overlap
assert "machine_protection_against_corruption" in vuln_overlap[0].obligations
# ---------------------------------------------------------------------------
# 6. Wann ist Data Act zusätzlich relevant?
# ---------------------------------------------------------------------------
def test_data_act_relevant_when_product_generates_data():
scope = discover_scope(sps_profile(generates_usage_data=True))
assert "DataAct" in _reg_ids(scope, "applicable_regulations")
obs = derive_obligations(sps_profile(generates_usage_data=True))
assert any(o.source_regulation == "DataAct" for o in obs.applicable_obligations)
def test_data_act_uncertain_when_data_unknown():
scope = discover_scope(sps_profile()) # generates_usage_data=None
assert "DataAct" in _reg_ids(scope, "uncertain_regulations")
# ---------------------------------------------------------------------------
# 7. Welche Pflichten gelten nicht ohne Funkmodul?
# ---------------------------------------------------------------------------
def test_no_radio_module_excludes_red():
scope = discover_scope(sps_profile(has_radio_module=False))
assert "RED" in _reg_ids(scope, "excluded_regulations")
assert "RED" not in _reg_ids(scope, "applicable_regulations")
def test_radio_unknown_makes_red_uncertain():
scope = discover_scope(sps_profile()) # has_radio_module=None
assert "RED" in _reg_ids(scope, "uncertain_regulations")
# ---------------------------------------------------------------------------
# 8. Welche Fakten fehlen für eine NIS2-Bewertung?
# ---------------------------------------------------------------------------
def test_nis2_missing_facts():
scope = discover_scope(sps_profile())
nis2 = [r for r in scope.uncertain_regulations if r.regulation_id == "NIS2"]
assert nis2
joined = " ".join(nis2[0].missing_facts).lower()
assert "unternehmensgröße" in joined and "sektor" in joined
# ---------------------------------------------------------------------------
# 9. Welche Nachweise decken mehrere Pflichten gleichzeitig? (USP)
# ---------------------------------------------------------------------------
def test_evidence_covers_multiple_obligations():
resp = derive_obligations(sps_profile())
multi = resp.evidence_for_multiple
assert multi # at least one evidence type spans >1 obligation
assert all(len(ids) > 1 for ids in multi.values())
assert "policy" in multi # the CRA process docs share a policy evidence
# ---------------------------------------------------------------------------
# 10. Auslegungen: zu eng / zu weit / plausibel / unbekannt
# ---------------------------------------------------------------------------
def test_interpretation_unknown_returns_uncertain():
result = assess_interpretation("Der Mond beeinflusst unsere Updatezyklen.")
assert result.assessment == InterpretationVerdict.UNCERTAIN
assert result.corrected_interpretation
def test_interpretation_open_source_partially_correct():
result = assess_interpretation("Open Source ist ausgenommen, also betrifft uns der CRA nicht.")
assert result.assessment == InterpretationVerdict.PARTIALLY_CORRECT
# ---------------------------------------------------------------------------
# Registry-alignment + contract guards
# ---------------------------------------------------------------------------
def test_cra_obligations_reuse_registry_ids_not_minted():
resp = derive_obligations(sps_profile())
anchored = [o for o in resp.applicable_obligations if o.registry_anchor]
assert "sbom_creation" in [o.obligation_id for o in anchored]
assert "provide_security_updates" in [o.obligation_id for o in anchored]
# machine obligations are proposed, never claimed as registry-owned
machine = [o for o in resp.applicable_obligations if o.source_regulation == "MaschinenVO"]
assert all(o.proposed and not o.registry_anchor for o in machine)
def test_required_evidence_only_uses_shared_catalog():
from compliance.reasoning.rules_types import EVIDENCE_CATALOG
from compliance.reasoning.rules_obligations import ALL_OBLIGATIONS
for rule in ALL_OBLIGATIONS:
assert set(rule.required_evidence) <= EVIDENCE_CATALOG
def test_claim_normalizer_is_deterministic():
a = normalize_claim("Wir haben einen Update-Prozess.")
b = normalize_claim("Wir haben einen Update-Prozess.")
assert a.claim_id == b.claim_id
assert "secure_updates" in a.claimed_capability
def test_unspecific_claim_asks_for_detail():
resp = assess_implementation(sps_profile(), "Wir sind sicher aufgestellt.")
assert resp.assessments == [] or all(
a.coverage_status == CoverageStatus.UNCLEAR for a in resp.assessments
)
assert "unspezifisch" in resp.summary.lower()
# ---------------------------------------------------------------------------
# Endpoint smoke tests
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def client():
from compliance.api.reasoning_routes import router
app = FastAPI()
app.include_router(router)
return TestClient(app)
def test_endpoint_scope(client):
r = client.post("/reasoning/scope", json={"product_profile": {"product_name": "X", "has_software": True, "eu_market": True, "manufacturer_role": "manufacturer"}})
assert r.status_code == 200
body = r.json()
assert "CRA" in [x["regulation_id"] for x in body["regulatory_scope"]["applicable_regulations"]]
def test_endpoint_obligations(client):
r = client.post(
"/reasoning/obligations",
json={"product_profile": {"product_name": "X", "has_software": True, "has_remote_access": True, "eu_market": True, "manufacturer_role": "manufacturer"}},
)
assert r.status_code == 200
assert r.json()["applicable_obligations"]
def test_endpoint_implementation(client):
r = client.post(
"/reasoning/implementation-assessment",
json={"product_profile": {"product_name": "X", "has_software": True, "eu_market": True, "manufacturer_role": "manufacturer"}, "customer_claim": "Wir haben SBOMs."},
)
assert r.status_code == 200
assert r.json()["assessments"]
def test_endpoint_interpretation(client):
r = client.post(
"/reasoning/interpretation-assessment",
json={"customer_interpretation": "CRA gilt nur für neue Produkte."},
)
assert r.status_code == 200
assert r.json()["assessment"] == "too_narrow"