From 1607c894597efa7a4ab0d4d2ae734ab6859719f5 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Thu, 25 Jun 2026 19:30:53 +0200 Subject: [PATCH] feat(reasoning): Regulatory Reasoning Engine MVP (scope/obligations/implementation/interpretation) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Deterministic reasoning layer ON TOP of the Legal Knowledge Graph (obligation registry) and the Compliance Execution Graph (control mapping/evidence). Answers which regulations apply to a concrete product, which obligations follow, whether the customer's implementation covers them, and whether a customer interpretation is too narrow/broad/plausible. - ProductProfile with tri-state facts (Optional[bool]=None => uncertain, never false security); safe predicate evaluator (no eval). - 6 regulation triggers (CRA/MaschinenVO/RED/EMV/DataAct/NIS2) with missing-fact prompts; 24 obligation scope rules. - CRA obligation_ids RE-USED verbatim from the registry (93 ids) — never re-minted (control_uuid trap); Machine/Data-Act flagged proposed=True. - required_evidence constrained to the framework-agnostic shared evidence catalog; capabilities echo the planned Obligation->Capability layer. - Overlap groups (CRA<->MaschinenVO cyber-safety) + evidence-for-multiple (USP). - 4 endpoints POST /reasoning/{scope,obligations,implementation-assessment, interpretation-assessment}; thin handlers, registered in api/__init__.py. - 22 tests (5 machine-builder scenarios + 10 acceptance questions). No DB migration, no RAG, no new controls. Co-Authored-By: Claude Opus 4.7 --- backend-compliance/compliance/api/__init__.py | 1 + .../compliance/api/reasoning_routes.py | 68 +++++ .../compliance/reasoning/__init__.py | 27 ++ .../compliance/reasoning/claim_normalizer.py | 45 +++ .../compliance/reasoning/enums.py | 85 ++++++ .../reasoning/implementation_engine.py | 142 +++++++++ .../reasoning/interpretation_engine.py | 65 +++++ .../compliance/reasoning/obligation_engine.py | 116 ++++++++ .../compliance/reasoning/predicates.py | 100 +++++++ .../compliance/reasoning/rules_obligations.py | 23 ++ .../reasoning/rules_obligations_cra.py | 271 ++++++++++++++++++ .../rules_obligations_machine_data.py | 139 +++++++++ .../compliance/reasoning/rules_overlaps.py | 91 ++++++ .../compliance/reasoning/rules_regulations.py | 160 +++++++++++ .../compliance/reasoning/rules_types.py | 58 ++++ .../compliance/reasoning/schemas.py | 216 ++++++++++++++ .../compliance/reasoning/scope_engine.py | 136 +++++++++ .../compliance/reasoning/taxonomy_claims.py | 104 +++++++ .../reasoning/taxonomy_interpretations.py | 159 ++++++++++ .../tests/test_reasoning_engine.py | 264 +++++++++++++++++ 20 files changed, 2270 insertions(+) create mode 100644 backend-compliance/compliance/api/reasoning_routes.py create mode 100644 backend-compliance/compliance/reasoning/__init__.py create mode 100644 backend-compliance/compliance/reasoning/claim_normalizer.py create mode 100644 backend-compliance/compliance/reasoning/enums.py create mode 100644 backend-compliance/compliance/reasoning/implementation_engine.py create mode 100644 backend-compliance/compliance/reasoning/interpretation_engine.py create mode 100644 backend-compliance/compliance/reasoning/obligation_engine.py create mode 100644 backend-compliance/compliance/reasoning/predicates.py create mode 100644 backend-compliance/compliance/reasoning/rules_obligations.py create mode 100644 backend-compliance/compliance/reasoning/rules_obligations_cra.py create mode 100644 backend-compliance/compliance/reasoning/rules_obligations_machine_data.py create mode 100644 backend-compliance/compliance/reasoning/rules_overlaps.py create mode 100644 backend-compliance/compliance/reasoning/rules_regulations.py create mode 100644 backend-compliance/compliance/reasoning/rules_types.py create mode 100644 backend-compliance/compliance/reasoning/schemas.py create mode 100644 backend-compliance/compliance/reasoning/scope_engine.py create mode 100644 backend-compliance/compliance/reasoning/taxonomy_claims.py create mode 100644 backend-compliance/compliance/reasoning/taxonomy_interpretations.py create mode 100644 backend-compliance/tests/test_reasoning_engine.py diff --git a/backend-compliance/compliance/api/__init__.py b/backend-compliance/compliance/api/__init__.py index 6518b4a6..8faf5866 100644 --- a/backend-compliance/compliance/api/__init__.py +++ b/backend-compliance/compliance/api/__init__.py @@ -77,6 +77,7 @@ _ROUTER_MODULES = [ "licenses_routes", "template_rule_routes", "specialist_agent_routes", + "reasoning_routes", ] _loaded_count = 0 diff --git a/backend-compliance/compliance/api/reasoning_routes.py b/backend-compliance/compliance/api/reasoning_routes.py new file mode 100644 index 00000000..7c65f46e --- /dev/null +++ b/backend-compliance/compliance/api/reasoning_routes.py @@ -0,0 +1,68 @@ +"""HTTP endpoints for the Regulatory Reasoning Engine (spec §7). + +Thin handlers — all reasoning lives in `compliance.reasoning.*`. No DB, no RAG; +pure deterministic rule evaluation. + + POST /reasoning/scope -> which regulations apply + missing facts + POST /reasoning/obligations -> obligations, overlaps, multi-evidence + POST /reasoning/implementation-assessment -> claim coverage per obligation + POST /reasoning/interpretation-assessment -> verdict on a customer interpretation +""" + +from __future__ import annotations + +from fastapi import APIRouter + +from compliance.reasoning import ( + assess_implementation, + assess_interpretation, + derive_obligations, + discover_scope, +) +from compliance.reasoning.schemas import ( + ImplementationRequest, + ImplementationResponse, + InterpretationRequest, + InterpretationResponse, + ObligationsRequest, + ObligationsResponse, + ScopeRequest, + ScopeResponse, +) + +router = APIRouter(prefix="/reasoning", tags=["reasoning"]) + + +@router.post("/scope", response_model=ScopeResponse) +def scope_discovery(req: ScopeRequest) -> ScopeResponse: + scope = discover_scope(req.product_profile) + return ScopeResponse( + regulatory_scope=scope, + missing_facts=scope.missing_facts, + confidence=scope.confidence, + ) + + +@router.post("/obligations", response_model=ObligationsResponse) +def applicable_obligations(req: ObligationsRequest) -> ObligationsResponse: + return derive_obligations(req.product_profile, req.regulatory_scope) + + +@router.post("/implementation-assessment", response_model=ImplementationResponse) +def implementation_assessment(req: ImplementationRequest) -> ImplementationResponse: + return assess_implementation(req.product_profile, req.customer_claim) + + +@router.post("/interpretation-assessment", response_model=InterpretationResponse) +def interpretation_assessment(req: InterpretationRequest) -> InterpretationResponse: + result = assess_interpretation(req.customer_interpretation, req.product_profile) + return InterpretationResponse( + assessment=result.assessment, + affected_regulations=result.affected_regulations, + affected_obligations=result.affected_obligations, + corrected_interpretation=result.corrected_interpretation, + risks=result.risks, + legal_basis_refs=result.legal_basis_refs, + explanation=result.explanation, + confidence=result.confidence, + ) diff --git a/backend-compliance/compliance/reasoning/__init__.py b/backend-compliance/compliance/reasoning/__init__.py new file mode 100644 index 00000000..64e967d5 --- /dev/null +++ b/backend-compliance/compliance/reasoning/__init__.py @@ -0,0 +1,27 @@ +"""Regulatory Reasoning Engine. + +A deterministic reasoning layer ON TOP of the Legal Knowledge Graph (obligation +registry) and the Compliance Execution Graph (control mapping / evidence). It +answers, for a concrete product: which regulations apply, which obligations +follow, whether the customer's implementation covers them, and whether a +customer interpretation is legally sound. + +No new RAG, no new controls, no DB schema changes — scope & reasoning metamodel +only (spec §14). +""" + +from __future__ import annotations + +from .claim_normalizer import normalize_claim +from .implementation_engine import assess_implementation +from .interpretation_engine import assess_interpretation +from .obligation_engine import derive_obligations +from .scope_engine import discover_scope + +__all__ = [ + "discover_scope", + "derive_obligations", + "normalize_claim", + "assess_implementation", + "assess_interpretation", +] diff --git a/backend-compliance/compliance/reasoning/claim_normalizer.py b/backend-compliance/compliance/reasoning/claim_normalizer.py new file mode 100644 index 00000000..d30564c3 --- /dev/null +++ b/backend-compliance/compliance/reasoning/claim_normalizer.py @@ -0,0 +1,45 @@ +"""Customer implementation claim normaliser (spec §4.6). + +Turns a free-text statement ("Wir haben einen Update-Prozess.") into structured +capabilities + related topics + weakness qualifiers. Deterministic substring +matching — the claim_id is a stable hash so the same statement always maps to +the same id (no randomness, replay-safe). +""" + +from __future__ import annotations + +import hashlib +from typing import List, Optional + +from .schemas import CustomerImplementationClaim +from .taxonomy_claims import match_capabilities, match_qualifiers, topics_for + + +def _claim_id(raw_statement: str) -> str: + digest = hashlib.sha1(raw_statement.strip().lower().encode("utf-8")).hexdigest() + return "claim_%s" % digest[:10] + + +def _normalized(capabilities: List[str], qualifiers: List[str]) -> str: + if not capabilities: + return "Keine bekannte Compliance-Fähigkeit aus der Aussage ableitbar." + text = "Fähigkeiten: " + ", ".join(capabilities) + if qualifiers: + text += " | Einschränkungen: " + ", ".join(qualifiers) + return text + + +def normalize_claim( + raw_statement: str, claim_id: Optional[str] = None, evidence_refs: Optional[List[str]] = None +) -> CustomerImplementationClaim: + capabilities = match_capabilities(raw_statement) + qualifiers = match_qualifiers(raw_statement) + return CustomerImplementationClaim( + claim_id=claim_id or _claim_id(raw_statement), + raw_statement=raw_statement, + normalized_claim=_normalized(capabilities, qualifiers), + claimed_capability=capabilities, + related_topics=topics_for(capabilities), + qualifiers=qualifiers, + evidence_refs=evidence_refs or [], + ) diff --git a/backend-compliance/compliance/reasoning/enums.py b/backend-compliance/compliance/reasoning/enums.py new file mode 100644 index 00000000..7f622d0c --- /dev/null +++ b/backend-compliance/compliance/reasoning/enums.py @@ -0,0 +1,85 @@ +"""Enumerations for the Regulatory Reasoning Engine. + +Kept dependency-free and Python 3.9 compatible (str-Enums, no `|` unions). +The reasoning layer sits ON TOP of the Legal Knowledge Graph (obligation +registry) and the Compliance Execution Graph (control mapping / evidence). +See memory `project_compliance_graph.md` for the cross-session contract. +""" + +from __future__ import annotations + +from enum import Enum + + +class ManufacturerRole(str, Enum): + MANUFACTURER = "manufacturer" + IMPORTER = "importer" + DISTRIBUTOR = "distributor" + INTEGRATOR = "integrator" + OPERATOR = "operator" + SERVICE_PROVIDER = "service_provider" + + +class ProductLifecyclePhase(str, Enum): + DEVELOPMENT = "development" + PLACING_ON_MARKET = "placing_on_market" + OPERATION = "operation" + MAINTENANCE = "maintenance" + UPDATE = "update" + END_OF_LIFE = "end_of_life" + + +class MarketModel(str, Enum): + B2B = "b2b" + B2C = "b2c" + BOTH = "both" + + +class ApplicabilityStatus(str, Enum): + APPLICABLE = "applicable" + PARTIALLY_APPLICABLE = "partially_applicable" + UNCERTAIN = "uncertain" + NOT_APPLICABLE = "not_applicable" + + +class Confidence(str, Enum): + HIGH = "high" + MEDIUM = "medium" + LOW = "low" + + +class AuthorityLevel(str, Enum): + """How binding a statement is — answers MUST visibly separate these.""" + + LEGAL_TEXT = "legal_text" + RECITAL = "recital" + GUIDANCE = "guidance" + HARMONIZED_STANDARD = "harmonized_standard" + TECHNICAL_STANDARD = "technical_standard" + BEST_PRACTICE = "best_practice" + INTERNAL_INTERPRETATION = "internal_interpretation" + + +class OverlapType(str, Enum): + IDENTICAL = "identical" + SIMILAR = "similar" + COMPLEMENTARY = "complementary" + CONFLICTING = "conflicting" + DIFFERENT_SCOPE = "different_scope" + + +class CoverageStatus(str, Enum): + COVERED = "covered" + PARTIALLY_COVERED = "partially_covered" + NOT_COVERED = "not_covered" + UNCLEAR = "unclear" + OUT_OF_SCOPE = "out_of_scope" + + +class InterpretationVerdict(str, Enum): + PLAUSIBLE = "plausible" + TOO_NARROW = "too_narrow" + TOO_BROAD = "too_broad" + PARTIALLY_CORRECT = "partially_correct" + UNSUPPORTED = "unsupported" + UNCERTAIN = "uncertain" diff --git a/backend-compliance/compliance/reasoning/implementation_engine.py b/backend-compliance/compliance/reasoning/implementation_engine.py new file mode 100644 index 00000000..d710fcf8 --- /dev/null +++ b/backend-compliance/compliance/reasoning/implementation_engine.py @@ -0,0 +1,142 @@ +"""Implementation reasoning engine (spec Modus 3). + +Given a free-text claim ("Wir haben SBOMs und machen Updates, wenn Kunden Fehler +melden.") it maps the claimed capabilities onto the product's applicable +obligations and reports, per obligation, whether it is covered, partially +covered or not covered — plus the evidence that would close the gap. +""" + +from __future__ import annotations + +from typing import Dict, List + +from .claim_normalizer import normalize_claim +from .enums import Confidence, CoverageStatus +from .obligation_engine import derive_obligations +from .schemas import ( + CustomerImplementationClaim, + ImplementationAssessment, + ImplementationResponse, + ProductProfile, +) +from .taxonomy_claims import topics_for + +# Typical sub-elements a capability still misses when only partially claimed. +STANDARD_GAPS: Dict[str, List[str]] = { + "software_bill_of_materials": [ + "Vulnerability-Monitoring der Komponenten", + "Bewertung betroffener Komponenten", + "Lieferantenprozess", + ], + "secure_updates": [ + "aktive Schwachstellenüberwachung", + "Patch-Bewertung", + "Fristen und Verantwortlichkeiten", + "Nachweis der Updatefähigkeit", + ], + "vulnerability_management": [ + "definierter Vulnerability-Handling-Prozess", + "Priorisierung und Fristen", + ], + "authentication": ["MFA für privilegierte Zugänge", "keine Standard-Zugangsdaten"], + "security_logging": ["Schutz der Logs vor Manipulation", "Monitoring/Alerting"], + "software_integrity": ["Signierung der Updates", "Verifikation der Update-Signatur"], + "secure_by_default": ["Härtung der Auslieferungskonfiguration", "Minimierung der Angriffsfläche"], + "secure_communication": ["verschlüsselte Übertragung", "Integritätsschutz der Verbindung"], + "risk_assessment": ["dokumentierte Risikobewertung", "Aufnahme in die technische Doku"], + "technical_documentation": ["vollständige technische Unterlagen", "Aktualisierung über den Lebenszyklus"], +} + + +def _missing_for(capabilities: List[str]) -> List[str]: + out: List[str] = [] + for cap in capabilities: + for gap in STANDARD_GAPS.get(cap, []): + if gap not in out: + out.append(gap) + return out + + +def _coverage(required: List[str], claimed: List[str], qualifiers: List[str]) -> CoverageStatus: + req, have = set(required), set(claimed) + hit = req & have + if not hit: + return CoverageStatus.NOT_COVERED + if "absent" in qualifiers or "planned" in qualifiers: + return CoverageStatus.NOT_COVERED + if "reactive" in qualifiers and hit & {"secure_updates", "vulnerability_management"}: + return CoverageStatus.PARTIALLY_COVERED + if req <= have: + return CoverageStatus.COVERED + return CoverageStatus.PARTIALLY_COVERED + + +def assess_implementation(profile: ProductProfile, customer_claim: str) -> ImplementationResponse: + claim = normalize_claim(customer_claim) + obligations = derive_obligations(profile).applicable_obligations + claimed = claim.claimed_capability + claim_topics = set(claim.related_topics) | set(claimed) + + assessments: List[ImplementationAssessment] = [] + missing_evidence: List[str] = [] + + for ob in obligations: + from .rules_obligations import obligation_rule + + rule = obligation_rule(ob.obligation_id) + required_caps = rule.required_capabilities if rule else [] + ob_topics = set(topics_for(required_caps)) | set(required_caps) + directly_claimed = bool(set(required_caps) & set(claimed)) + related = bool(ob_topics & claim_topics) + if not directly_claimed and not related: + continue # unrelated to the claim -> don't assess + + status = _coverage(required_caps, claimed, claim.qualifiers) + missing = [] if status == CoverageStatus.COVERED else _missing_for(required_caps) + explanation = _explain(status, ob.title, claim.qualifiers) + if status != CoverageStatus.COVERED: + for ev in ob.required_evidence: + if ev not in missing_evidence: + missing_evidence.append(ev) + assessments.append( + ImplementationAssessment( + claim_id=claim.claim_id, + obligation_id=ob.obligation_id, + coverage_status=status, + missing_elements=missing, + required_evidence=ob.required_evidence, + explanation=explanation, + confidence=Confidence.MEDIUM, + ) + ) + + return ImplementationResponse( + claim=claim, + assessments=assessments, + missing_evidence=missing_evidence, + summary=_summary(claim, assessments), + ) + + +def _explain(status: CoverageStatus, title: str, qualifiers: List[str]) -> str: + if status == CoverageStatus.COVERED: + return "Die Pflicht '%s' wird durch die beschriebene Umsetzung plausibel abgedeckt." % title + if status == CoverageStatus.PARTIALLY_COVERED: + extra = " Der Prozess wirkt reaktiv." if "reactive" in qualifiers else "" + return "Die Pflicht '%s' ist nur teilweise abgedeckt.%s" % (title, extra) + return "Die Pflicht '%s' wird durch die Aussage nicht abgedeckt." % title + + +def _summary(claim: CustomerImplementationClaim, assessments: List[ImplementationAssessment]) -> str: + if not claim.claimed_capability: + return "Die Aussage ist zu unspezifisch — bitte konkretisieren, was umgesetzt wurde." + covered = sum(1 for a in assessments if a.coverage_status == CoverageStatus.COVERED) + partial = sum(1 for a in assessments if a.coverage_status == CoverageStatus.PARTIALLY_COVERED) + notc = sum(1 for a in assessments if a.coverage_status == CoverageStatus.NOT_COVERED) + if notc or partial: + head = "Teilweise erfüllt" + elif covered: + head = "Plausibel abgedeckt" + else: + head = "Nicht beurteilbar" + return "%s: %d abgedeckt, %d teilweise, %d offen." % (head, covered, partial, notc) diff --git a/backend-compliance/compliance/reasoning/interpretation_engine.py b/backend-compliance/compliance/reasoning/interpretation_engine.py new file mode 100644 index 00000000..8bbba8bd --- /dev/null +++ b/backend-compliance/compliance/reasoning/interpretation_engine.py @@ -0,0 +1,65 @@ +"""Interpretation review engine (spec Modus 4). + +Evaluates whether a customer's legal interpretation is plausible, too narrow, +too broad, etc. Matches the interpretation against a curated pattern library; +no match -> `uncertain` plus a request for the missing context (never invent a +verdict, spec §6.3). +""" + +from __future__ import annotations + +import hashlib +from typing import Optional + +from .enums import Confidence, InterpretationVerdict +from .schemas import InterpretationAssessment, ProductProfile +from .taxonomy_interpretations import INTERPRETATION_PATTERNS, InterpretationPattern + + +def _interpretation_id(raw: str) -> str: + digest = hashlib.sha1(raw.strip().lower().encode("utf-8")).hexdigest() + return "interp_%s" % digest[:10] + + +def _best_match(text: str) -> Optional[InterpretationPattern]: + low = text.lower() + best: Optional[InterpretationPattern] = None + best_score = 0 + for pattern in INTERPRETATION_PATTERNS: + score = sum(1 for t in pattern.triggers if t in low) + if score > best_score: + best, best_score = pattern, score + return best + + +def assess_interpretation( + raw_interpretation: str, profile: Optional[ProductProfile] = None +) -> InterpretationAssessment: + interp_id = _interpretation_id(raw_interpretation) + pattern = _best_match(raw_interpretation) + + if pattern is None: + return InterpretationAssessment( + interpretation_id=interp_id, + raw_interpretation=raw_interpretation, + assessment=InterpretationVerdict.UNCERTAIN, + corrected_interpretation=( + "Diese Auslegung lässt sich ohne weitere Angaben nicht bewerten. Bitte Produkt, " + "Rolle, Marktzugang und die konkret betroffene Pflicht benennen." + ), + explanation="Kein bekanntes Auslegungsmuster erkannt — bewusst keine Scheinsicherheit.", + confidence=Confidence.LOW, + ) + + return InterpretationAssessment( + interpretation_id=interp_id, + raw_interpretation=raw_interpretation, + affected_regulations=pattern.affected_regulations, + affected_obligations=pattern.affected_obligations, + assessment=pattern.verdict, + risks=pattern.risks, + corrected_interpretation=pattern.corrected_interpretation, + legal_basis_refs=pattern.legal_basis_refs, + explanation=pattern.explanation, + confidence=pattern.confidence, + ) diff --git a/backend-compliance/compliance/reasoning/obligation_engine.py b/backend-compliance/compliance/reasoning/obligation_engine.py new file mode 100644 index 00000000..e242ef11 --- /dev/null +++ b/backend-compliance/compliance/reasoning/obligation_engine.py @@ -0,0 +1,116 @@ +"""Applicable-obligation engine (spec Modus 2). + +Maps a product profile (optionally a precomputed scope) to the concrete legal +obligations, the overlaps between them, and which evidence types satisfy more +than one obligation at once (the core USP, spec §16). +""" + +from __future__ import annotations + +from typing import Dict, List, Optional + +from .predicates import evaluate, true_leaves +from .rules_obligations import ALL_OBLIGATIONS +from .rules_overlaps import OVERLAP_GROUPS +from .rules_regulations import FIELD_LABELS +from .rules_types import ObligationRule +from .schemas import ( + ApplicableObligation, + ObligationOverlap, + ObligationsResponse, + ProductProfile, + RegulatoryScope, +) +from .scope_engine import discover_scope + + +def _applicable_regulation_ids(profile: ProductProfile, scope: Optional[RegulatoryScope]) -> List[str]: + if scope is None: + scope = discover_scope(profile) + return [r.regulation_id for r in scope.applicable_regulations] + + +def _applies_because(rule: ObligationRule, profile: ProductProfile) -> List[str]: + labels: List[str] = [] + for leaf in true_leaves(rule.applies_if, profile): + label = FIELD_LABELS.get(leaf[0]) + if label and label not in labels: + labels.append(label) + if not labels: + labels.append("%s ist für dieses Produkt anwendbar." % rule.source_regulation) + return labels + + +def _role_ok(rule: ObligationRule, profile: ProductProfile) -> bool: + role = profile.manufacturer_role + if role is None: + return True # unknown role -> do not exclude + return role.value in rule.applies_to_role + + +def derive_obligations( + profile: ProductProfile, scope: Optional[RegulatoryScope] = None +) -> ObligationsResponse: + active_regs = set(_applicable_regulation_ids(profile, scope)) + response = ObligationsResponse() + applied_ids: List[str] = [] + + for rule in ALL_OBLIGATIONS: + if rule.source_regulation not in active_regs: + continue + if rule.applies_unless is not None and evaluate(rule.applies_unless, profile) is True: + continue + verdict = evaluate(rule.applies_if, profile) + if verdict is not True or not _role_ok(rule, profile): + if verdict is False: + response.excluded_obligations.append(rule.obligation_id) + continue + applied_ids.append(rule.obligation_id) + response.applicable_obligations.append( + ApplicableObligation( + obligation_id=rule.obligation_id, + title=rule.title, + source_regulation=rule.source_regulation, + legal_basis_refs=rule.legal_basis_refs, + obligation_text=rule.obligation_text, + authority_level=rule.authority_level, + applies_because=_applies_because(rule, profile), + applies_to_role=rule.applies_to_role, + lifecycle_phase=rule.lifecycle_phase, + overlap_group_id=rule.overlap_group_id, + required_evidence=rule.required_evidence, + confidence=rule.base_confidence, + registry_anchor=rule.registry_anchor, + proposed=rule.proposed, + ) + ) + + response.overlaps = _overlaps(applied_ids) + response.evidence_for_multiple = _evidence_for_multiple(response.applicable_obligations) + return response + + +def _overlaps(applied_ids: List[str]) -> List[ObligationOverlap]: + applied = set(applied_ids) + out: List[ObligationOverlap] = [] + for group in OVERLAP_GROUPS: + present = [m for m in group.members if m in applied] + if len(present) >= 2: + out.append( + ObligationOverlap( + overlap_group_id=group.overlap_group_id, + obligations=present, + overlap_type=group.overlap_type, + canonical_obligation_id=group.canonical_obligation_id, + explanation=group.explanation, + ) + ) + return out + + +def _evidence_for_multiple(obligations: List[ApplicableObligation]) -> Dict[str, List[str]]: + by_evidence: Dict[str, List[str]] = {} + for ob in obligations: + for ev in ob.required_evidence: + by_evidence.setdefault(ev, []).append(ob.obligation_id) + return {ev: ids for ev, ids in by_evidence.items() if len(ids) > 1} diff --git a/backend-compliance/compliance/reasoning/predicates.py b/backend-compliance/compliance/reasoning/predicates.py new file mode 100644 index 00000000..df23513f --- /dev/null +++ b/backend-compliance/compliance/reasoning/predicates.py @@ -0,0 +1,100 @@ +"""Safe, tri-state condition evaluator for applicability rules. + +Conditions are plain data (no `eval`): a *leaf* is a 3-tuple +``(field, op, value)``; a *composite* is ``{"all": [...]}`` or +``{"any": [...]}``. Evaluation is tri-state — ``True`` / ``False`` / +``None`` (unknown) — so a missing product fact yields *uncertain*, never a +false negative. +""" + +from __future__ import annotations + +from enum import Enum +from typing import Any, Dict, List, Optional, Tuple, Union + +Leaf = Tuple[str, str, Any] +Condition = Union[Leaf, Dict[str, Any]] + + +def _attr(profile: Any, field: str) -> Any: + value = getattr(profile, field, None) + if isinstance(value, Enum): + return value.value + return value + + +def _eval_leaf(leaf: Leaf, profile: Any) -> Optional[bool]: + field, op, expected = leaf + actual = _attr(profile, field) + + if op == "not_none": + return actual is not None + if op == "is_none": + return actual is None + + if op == "contains_any": + # list-valued field (e.g. product_type); empty list = known-empty. + items = actual or [] + hay = " ".join(str(x).lower() for x in items) + return any(str(k).lower() in hay for k in expected) + + if actual is None: + return None # unknown fact -> unknown result + + if op == "eq": + return bool(actual == expected) + if op == "ne": + return bool(actual != expected) + if op == "truthy": + return bool(actual) + if op == "falsy": + return not bool(actual) + if op == "in": + return bool(actual in expected) + if op == "not_in": + return bool(actual not in expected) + if op == "date_after": + return bool(actual > expected) + raise ValueError("unknown predicate op: %r" % (op,)) + + +def evaluate(condition: Optional[Condition], profile: Any) -> Optional[bool]: + """Return True/False/None(unknown) for a condition tree.""" + if condition is None: + return True + if isinstance(condition, tuple): + return _eval_leaf(condition, profile) + + if "all" in condition: + results = [evaluate(c, profile) for c in condition["all"]] + if any(r is False for r in results): + return False + if any(r is None for r in results): + return None + return True + if "any" in condition: + results = [evaluate(c, profile) for c in condition["any"]] + if any(r is True for r in results): + return True + if any(r is None for r in results): + return None + return False + raise ValueError("malformed condition: %r" % (condition,)) + + +def true_leaves(condition: Optional[Condition], profile: Any) -> List[Leaf]: + """Collect the leaf conditions that evaluated True (for trigger_facts).""" + if condition is None: + return [] + if isinstance(condition, tuple): + return [condition] if _eval_leaf(condition, profile) is True else [] + members = condition.get("all") or condition.get("any") or [] + out: List[Leaf] = [] + for c in members: + out.extend(true_leaves(c, profile)) + return out + + +def unknown_fields(fields: List[str], profile: Any) -> List[str]: + """Subset of `fields` whose value on the profile is None (unknown).""" + return [f for f in fields if _attr(profile, f) is None] diff --git a/backend-compliance/compliance/reasoning/rules_obligations.py b/backend-compliance/compliance/reasoning/rules_obligations.py new file mode 100644 index 00000000..aec9d16c --- /dev/null +++ b/backend-compliance/compliance/reasoning/rules_obligations.py @@ -0,0 +1,23 @@ +"""Aggregated obligation scope rules + lookup helpers.""" + +from __future__ import annotations + +from typing import Dict, List, Optional + +from .rules_obligations_cra import CRA_OBLIGATIONS +from .rules_obligations_machine_data import DATA_ACT_OBLIGATIONS, MACHINE_OBLIGATIONS +from .rules_types import ObligationRule + +ALL_OBLIGATIONS: List[ObligationRule] = ( + CRA_OBLIGATIONS + MACHINE_OBLIGATIONS + DATA_ACT_OBLIGATIONS +) + +_BY_ID: Dict[str, ObligationRule] = {o.obligation_id: o for o in ALL_OBLIGATIONS} + + +def obligation_rule(obligation_id: str) -> Optional[ObligationRule]: + return _BY_ID.get(obligation_id) + + +def obligations_for_regulation(regulation_id: str) -> List[ObligationRule]: + return [o for o in ALL_OBLIGATIONS if o.source_regulation == regulation_id] diff --git a/backend-compliance/compliance/reasoning/rules_obligations_cra.py b/backend-compliance/compliance/reasoning/rules_obligations_cra.py new file mode 100644 index 00000000..34f0e398 --- /dev/null +++ b/backend-compliance/compliance/reasoning/rules_obligations_cra.py @@ -0,0 +1,271 @@ +"""CRA obligation scope rules. + +`obligation_id`s in the six CRA-P1 families (sbom/vuln/authentication/logging/ +remote_access/updates) are RE-USED verbatim from the Legal-KG registry +(`obligations/obligation_join_keys.json`) — never re-minted (control_uuid trap, +memory `project_compliance_graph.md`). Cross-cutting CRA *process* obligations +(risk assessment, technical documentation, CE, instructions, secure-by-design +umbrella) are not yet in the registry and are flagged `proposed=True`. +""" + +from __future__ import annotations + +from typing import List + +from .enums import AuthorityLevel, Confidence +from .rules_types import ObligationRule + +_HAS_SW = ("has_software", "eq", True) +_EU = ("eu_market", "eq", True) +_REMOTE_OR_CLOUD = {"any": [("has_remote_access", "eq", True), ("has_cloud_connection", "eq", True)]} +_LM = AuthorityLevel.LEGAL_TEXT + +CRA_OBLIGATIONS: List[ObligationRule] = [ + ObligationRule( + obligation_id="sbom_creation", + title="Software Bill of Materials erstellen", + source_regulation="CRA", + obligation_text="Eine SBOM erstellen, die mindestens die obersten Abhängigkeiten des Produkts dokumentiert.", + legal_basis_refs=["CRA Annex I Part II (1)"], + authority_level=_LM, + family="sbom", + applies_if={"all": [_HAS_SW, _EU]}, + required_capabilities=["software_bill_of_materials"], + required_evidence=["sbom", "repo_scan"], + lifecycle_phase=["development", "placing_on_market", "maintenance"], + registry_anchor=True, + ), + ObligationRule( + obligation_id="provide_security_updates", + title="Sicherheitsupdates bereitstellen", + source_regulation="CRA", + obligation_text="Sicherheitsrelevante Updates zeitnah und über den Supportzeitraum bereitstellen.", + legal_basis_refs=["CRA Annex I (2)(c)", "CRA Art. 13"], + authority_level=_LM, + family="updates", + applies_if={"all": [_HAS_SW, _EU]}, + required_capabilities=["secure_updates"], + required_evidence=["policy", "ticket", "test_report"], + lifecycle_phase=["maintenance", "update"], + overlap_group_id="SECURITY_UPDATES", + registry_anchor=True, + ), + ObligationRule( + obligation_id="support_period_maintenance", + title="Supportzeitraum definieren und einhalten", + source_regulation="CRA", + obligation_text="Einen angemessenen Supportzeitraum festlegen, in dem Schwachstellen behandelt werden.", + legal_basis_refs=["CRA Art. 13(8)"], + authority_level=_LM, + family="updates", + applies_if={"all": [_HAS_SW, _EU]}, + required_capabilities=["secure_updates"], + required_evidence=["policy"], + lifecycle_phase=["placing_on_market", "maintenance", "update"], + registry_anchor=True, + ), + ObligationRule( + obligation_id="signed_update_integrity", + title="Integrität von Updates sicherstellen", + source_regulation="CRA", + obligation_text="Updates signieren und ihre Integrität bei der Verteilung verifizieren.", + legal_basis_refs=["CRA Annex I (1)(3)(f)"], + authority_level=_LM, + family="updates", + applies_if={"all": [_HAS_SW, _EU]}, + required_capabilities=["software_integrity"], + required_evidence=["config_export", "test_report"], + lifecycle_phase=["development", "maintenance", "update"], + overlap_group_id="SECURITY_UPDATES", + registry_anchor=True, + ), + ObligationRule( + obligation_id="vuln_handling_process", + title="Schwachstellenbehandlungs-Prozess", + source_regulation="CRA", + obligation_text="Einen dokumentierten Prozess zur Identifikation, Bewertung und Behebung von Schwachstellen betreiben.", + legal_basis_refs=["CRA Art. 13(8)", "CRA Annex VII"], + authority_level=_LM, + family="vuln", + applies_if={"all": [_HAS_SW, _EU]}, + required_capabilities=["vulnerability_management"], + required_evidence=["policy", "ticket"], + lifecycle_phase=["development", "operation", "maintenance"], + overlap_group_id="VULNERABILITY_HANDLING", + registry_anchor=True, + ), + ObligationRule( + obligation_id="coordinated_vulnerability_disclosure", + title="Coordinated Vulnerability Disclosure", + source_regulation="CRA", + obligation_text="Eine Richtlinie zur koordinierten Offenlegung von Schwachstellen bereitstellen.", + legal_basis_refs=["CRA Annex I Part II (5)"], + authority_level=_LM, + family="vuln", + applies_if={"all": [_HAS_SW, _EU]}, + required_capabilities=["coordinated_disclosure"], + required_evidence=["policy"], + lifecycle_phase=["operation", "maintenance"], + overlap_group_id="VULNERABILITY_HANDLING", + registry_anchor=True, + ), + ObligationRule( + obligation_id="exploited_vuln_reporting_authorities", + title="Meldung aktiv ausgenutzter Schwachstellen / Vorfälle", + source_regulation="CRA", + obligation_text="Aktiv ausgenutzte Schwachstellen und schwerwiegende Vorfälle an die zuständigen Behörden melden.", + legal_basis_refs=["CRA Art. 14", "CRA Art. 16"], + authority_level=_LM, + family="vuln", + applies_if={"all": [_HAS_SW, _EU]}, + required_capabilities=["incident_reporting"], + required_evidence=["policy", "ticket"], + lifecycle_phase=["operation", "maintenance"], + registry_anchor=True, + ), + ObligationRule( + obligation_id="user_authentication_required", + title="Authentifizierung vorsehen", + source_regulation="CRA", + obligation_text="Den Zugang über einen geeigneten Authentifizierungsmechanismus schützen.", + legal_basis_refs=["CRA Annex I (2)(d)"], + authority_level=_LM, + family="authentication", + applies_if={"all": [_HAS_SW, _EU]}, + required_capabilities=["authentication"], + required_evidence=["config_export", "pentest"], + lifecycle_phase=["development", "operation"], + registry_anchor=True, + ), + ObligationRule( + obligation_id="no_default_credentials", + title="Keine unveränderlichen Standard-Zugangsdaten", + source_regulation="CRA", + obligation_text="Sichere Standardkonfiguration; keine fest hinterlegten oder unveränderlichen Standard-Passwörter.", + legal_basis_refs=["CRA Annex I (2)(a)", "CRA Annex I (2)(b)"], + authority_level=_LM, + family="authentication", + applies_if={"all": [_HAS_SW, _EU]}, + required_capabilities=["secure_by_default"], + required_evidence=["config_export", "test_report"], + lifecycle_phase=["development", "placing_on_market"], + registry_anchor=True, + ), + ObligationRule( + obligation_id="event_logging_security_events", + title="Sicherheitsrelevante Ereignisse protokollieren", + source_regulation="CRA", + obligation_text="Sicherheitsrelevante Ereignisse und Zugriffe aufzeichnen, um Vorfälle nachvollziehen zu können.", + legal_basis_refs=["CRA Annex I Part I (2)(k)"], + authority_level=_LM, + family="logging", + applies_if={"all": [_HAS_SW, _EU]}, + required_capabilities=["security_logging"], + required_evidence=["config_export", "audit_log"], + lifecycle_phase=["operation", "maintenance"], + registry_anchor=True, + ), + ObligationRule( + obligation_id="remote_access_attack_surface_min", + title="Angriffsfläche minimieren", + source_regulation="CRA", + obligation_text="Die Angriffsfläche begrenzen, insbesondere exponierte Remote-/Cloud-Schnittstellen.", + legal_basis_refs=["CRA Annex I (1)(2)(a)"], + authority_level=_LM, + family="remote_access", + applies_if={"all": [_REMOTE_OR_CLOUD, _EU]}, + required_capabilities=["secure_by_default"], + required_evidence=["config_export", "repo_scan", "pentest"], + lifecycle_phase=["development", "operation"], + registry_anchor=True, + ), + ObligationRule( + obligation_id="remote_access_confidentiality_integrity", + title="Vertraulichkeit/Integrität der Fernverbindung", + source_regulation="CRA", + obligation_text="Daten bei Fernzugriff/Cloud-Anbindung verschlüsselt und integritätsgeschützt übertragen.", + legal_basis_refs=["CRA Annex I (1)(2)(b)", "CRA Annex I (1)(2)(c)"], + authority_level=_LM, + family="remote_access", + applies_if={"all": [_REMOTE_OR_CLOUD, _EU]}, + required_capabilities=["secure_communication"], + required_evidence=["config_export", "pentest"], + lifecycle_phase=["operation"], + registry_anchor=True, + ), + # --- Cross-cutting CRA process obligations (not yet in registry) --------- + ObligationRule( + obligation_id="cra_secure_by_design", + title="Security by Design", + source_regulation="CRA", + obligation_text="Das Produkt so entwerfen, entwickeln und herstellen, dass ein angemessenes Cybersicherheitsniveau gewährleistet ist.", + legal_basis_refs=["CRA Annex I Part I (1)"], + authority_level=_LM, + family="cra_process", + applies_if={"all": [_HAS_SW, _EU]}, + required_capabilities=["secure_by_default", "risk_assessment"], + required_evidence=["policy", "test_report"], + lifecycle_phase=["development", "placing_on_market"], + proposed=True, + ), + ObligationRule( + obligation_id="cra_risk_assessment", + title="Cybersicherheits-Risikobewertung", + source_regulation="CRA", + obligation_text="Eine Cybersicherheits-Risikobewertung durchführen und dokumentieren; in die technische Dokumentation aufnehmen.", + legal_basis_refs=["CRA Art. 13(2)", "CRA Annex I Part I (1)"], + authority_level=_LM, + family="cra_process", + applies_if={"all": [_HAS_SW, _EU]}, + required_capabilities=["risk_assessment"], + required_evidence=["policy"], + lifecycle_phase=["development", "placing_on_market"], + overlap_group_id="RISK_ASSESSMENT", + proposed=True, + ), + ObligationRule( + obligation_id="cra_technical_documentation", + title="Technische Dokumentation", + source_regulation="CRA", + obligation_text="Technische Dokumentation erstellen und aktuell halten, die Konformität mit den Anforderungen belegt.", + legal_basis_refs=["CRA Art. 31", "CRA Annex VII"], + authority_level=_LM, + family="cra_process", + applies_if={"all": [_HAS_SW, _EU]}, + required_capabilities=["technical_documentation"], + required_evidence=["policy"], + lifecycle_phase=["placing_on_market", "maintenance"], + overlap_group_id="TECHNICAL_DOCUMENTATION", + proposed=True, + ), + ObligationRule( + obligation_id="cra_ce_conformity_assessment", + title="Konformitätsbewertung / CE-Kennzeichnung", + source_regulation="CRA", + obligation_text="Vor dem Inverkehrbringen das passende Konformitätsbewertungsverfahren durchlaufen und CE kennzeichnen.", + legal_basis_refs=["CRA Art. 32", "CRA Art. 28"], + authority_level=_LM, + family="cra_process", + applies_if={"all": [_HAS_SW, _EU]}, + required_capabilities=["conformity_assessment"], + required_evidence=["test_report", "policy"], + lifecycle_phase=["placing_on_market"], + overlap_group_id="CE_CONFORMITY", + proposed=True, + ), + ObligationRule( + obligation_id="cra_instructions_for_use", + title="Informationen und Anweisungen für Nutzer", + source_regulation="CRA", + obligation_text="Nutzern verständliche Sicherheitsinformationen und -anweisungen bereitstellen (z. B. zu Updates und Support-Ende).", + legal_basis_refs=["CRA Annex II"], + authority_level=_LM, + family="cra_process", + applies_if={"all": [_HAS_SW, _EU]}, + required_capabilities=["technical_documentation"], + required_evidence=["policy"], + lifecycle_phase=["placing_on_market"], + overlap_group_id="INSTRUCTIONS_FOR_USE", + proposed=True, + ), +] diff --git a/backend-compliance/compliance/reasoning/rules_obligations_machine_data.py b/backend-compliance/compliance/reasoning/rules_obligations_machine_data.py new file mode 100644 index 00000000..d592c988 --- /dev/null +++ b/backend-compliance/compliance/reasoning/rules_obligations_machine_data.py @@ -0,0 +1,139 @@ +"""MaschinenVO and Data Act obligation scope rules. + +These regulations are NOT yet in the Legal-KG registry (which currently covers +the six CRA-P1 families). Every obligation here is therefore `proposed=True`: +the reasoning layer proposes the snake_case id, the Obligation Registry session +remains the only authority that may canonicalise it (re-link, never re-mint). +""" + +from __future__ import annotations + +from typing import List + +from .enums import AuthorityLevel, Confidence +from .rules_types import ObligationRule + +_EU = ("eu_market", "eq", True) +_IS_MACHINE = ("is_machine", "eq", True) +_LM = AuthorityLevel.LEGAL_TEXT + +MACHINE_OBLIGATIONS: List[ObligationRule] = [ + ObligationRule( + obligation_id="machine_risk_assessment", + title="Maschinen-Risikobeurteilung", + source_regulation="MaschinenVO", + obligation_text="Eine Risikobeurteilung der Maschine durchführen, um Gefährdungen zu ermitteln und zu mindern.", + legal_basis_refs=["MaschinenVO (EU) 2023/1230 Anhang III (1.1.1)", "EN ISO 12100"], + authority_level=_LM, + family="machine_safety", + applies_if={"all": [_IS_MACHINE, _EU]}, + required_capabilities=["risk_assessment"], + required_evidence=["policy"], + lifecycle_phase=["development", "placing_on_market"], + overlap_group_id="RISK_ASSESSMENT", + proposed=True, + ), + ObligationRule( + obligation_id="machine_safety_control_systems", + title="Sichere Steuerungssysteme", + source_regulation="MaschinenVO", + obligation_text="Sicherheitsbezogene Teile der Steuerung so auslegen, dass Ausfälle nicht zu gefährlichen Zuständen führen.", + legal_basis_refs=["MaschinenVO (EU) 2023/1230 Anhang III (1.2.1)", "EN ISO 13849-1"], + authority_level=_LM, + family="machine_safety", + applies_if={"all": [_IS_MACHINE, ("has_safety_function", "eq", True), _EU]}, + required_capabilities=["functional_safety"], + required_evidence=["test_report", "policy"], + lifecycle_phase=["development", "placing_on_market"], + proposed=True, + ), + ObligationRule( + obligation_id="machine_protection_against_corruption", + title="Schutz gegen Korrumpierung sicherheitsrelevanter Funktionen", + source_regulation="MaschinenVO", + obligation_text="Sicherstellen, dass eine (auch beabsichtigte) Korrumpierung der Software/Verbindung keine gefährliche Situation auslöst.", + legal_basis_refs=["MaschinenVO (EU) 2023/1230 Anhang III (1.1.9)"], + authority_level=_LM, + family="machine_safety", + applies_if={ + "all": [ + _IS_MACHINE, + ("has_safety_function", "eq", True), + {"any": [("has_remote_access", "eq", True), ("has_software", "eq", True)]}, + _EU, + ] + }, + required_capabilities=["software_integrity", "secure_by_default"], + required_evidence=["test_report", "config_export"], + lifecycle_phase=["development", "operation", "maintenance"], + overlap_group_id="VULNERABILITY_HANDLING", + proposed=True, + ), + ObligationRule( + obligation_id="machine_instructions_for_use", + title="Betriebsanleitung", + source_regulation="MaschinenVO", + obligation_text="Eine vollständige Betriebsanleitung mit Sicherheitshinweisen bereitstellen.", + legal_basis_refs=["MaschinenVO (EU) 2023/1230 Anhang III (1.7.4)"], + authority_level=_LM, + family="machine_safety", + applies_if={"all": [_IS_MACHINE, _EU]}, + required_capabilities=["technical_documentation"], + required_evidence=["policy"], + lifecycle_phase=["placing_on_market"], + overlap_group_id="INSTRUCTIONS_FOR_USE", + proposed=True, + ), + ObligationRule( + obligation_id="machine_ce_conformity", + title="Konformitätsbewertung / CE (Maschine)", + source_regulation="MaschinenVO", + obligation_text="Das passende Konformitätsbewertungsverfahren der MaschinenVO durchlaufen und CE kennzeichnen.", + legal_basis_refs=["MaschinenVO (EU) 2023/1230 Art. 25", "Anhang IV"], + authority_level=_LM, + family="machine_safety", + applies_if={"all": [_IS_MACHINE, _EU]}, + required_capabilities=["conformity_assessment"], + required_evidence=["test_report", "policy"], + lifecycle_phase=["placing_on_market"], + overlap_group_id="CE_CONFORMITY", + proposed=True, + ), +] + +DATA_ACT_OBLIGATIONS: List[ObligationRule] = [ + ObligationRule( + obligation_id="data_act_data_access_by_design", + title="Datenzugang by design", + source_regulation="DataAct", + obligation_text="Vernetzte Produkte so gestalten, dass die erzeugten Produktdaten standardmäßig zugänglich sind.", + legal_basis_refs=["Data Act (EU) 2023/2854 Art. 3"], + authority_level=_LM, + family="data_act", + applies_if={ + "all": [ + ("generates_usage_data", "eq", True), + {"any": [("has_cloud_connection", "eq", True), ("has_remote_access", "eq", True)]}, + _EU, + ] + }, + required_capabilities=["data_access_provision"], + required_evidence=["config_export", "policy"], + lifecycle_phase=["development", "placing_on_market"], + proposed=True, + ), + ObligationRule( + obligation_id="data_act_user_data_access", + title="Datenzugang für Nutzer", + source_regulation="DataAct", + obligation_text="Nutzern Zugang zu den von ihnen erzeugten Daten gewähren und Weitergabe an Dritte ermöglichen.", + legal_basis_refs=["Data Act (EU) 2023/2854 Art. 4", "Art. 5"], + authority_level=_LM, + family="data_act", + applies_if={"all": [("generates_usage_data", "eq", True), _EU]}, + required_capabilities=["data_access_provision"], + required_evidence=["policy"], + lifecycle_phase=["operation"], + proposed=True, + ), +] diff --git a/backend-compliance/compliance/reasoning/rules_overlaps.py b/backend-compliance/compliance/reasoning/rules_overlaps.py new file mode 100644 index 00000000..e42b1277 --- /dev/null +++ b/backend-compliance/compliance/reasoning/rules_overlaps.py @@ -0,0 +1,91 @@ +"""Obligation overlap groups (spec §4.5 / Modus 2). + +Overlaps are emitted only for the members that are actually applicable to the +product. `canonical_obligation_id` points at the strongest / most specific +obligation in the group (preferring a registry-anchored CRA id). +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import List + +from .enums import OverlapType + + +@dataclass(frozen=True) +class OverlapGroup: + overlap_group_id: str + members: List[str] + overlap_type: OverlapType + canonical_obligation_id: str + explanation: str + + +OVERLAP_GROUPS: List[OverlapGroup] = [ + OverlapGroup( + overlap_group_id="VULNERABILITY_HANDLING", + members=[ + "vuln_handling_process", + "coordinated_vulnerability_disclosure", + "machine_protection_against_corruption", + ], + overlap_type=OverlapType.COMPLEMENTARY, + canonical_obligation_id="vuln_handling_process", + explanation=( + "CRA adressiert die Schwachstellenbehandlung des Produkts. Die MaschinenVO wird " + "komplementär relevant, sobald eine Cyber-Schwachstelle eine Sicherheitsfunktion " + "beeinflussen kann (Anhang III 1.1.9). Nicht identisch, aber gemeinsam zu erfüllen." + ), + ), + OverlapGroup( + overlap_group_id="SECURITY_UPDATES", + members=["provide_security_updates", "signed_update_integrity"], + overlap_type=OverlapType.COMPLEMENTARY, + canonical_obligation_id="provide_security_updates", + explanation=( + "Updates bereitstellen und ihre Integrität sichern sind zwei Seiten desselben " + "Update-Prozesses; ein Nachweis (Update-Policy, Release Notes) deckt teils beide ab." + ), + ), + OverlapGroup( + overlap_group_id="RISK_ASSESSMENT", + members=["cra_risk_assessment", "machine_risk_assessment"], + overlap_type=OverlapType.DIFFERENT_SCOPE, + canonical_obligation_id="cra_risk_assessment", + explanation=( + "Zwei getrennte Risikobetrachtungen: CRA = Cybersicherheits-Risiko, MaschinenVO = " + "Sicherheits-/Gefährdungsbeurteilung. Methodisch verwandt, inhaltlich unterschiedlich." + ), + ), + OverlapGroup( + overlap_group_id="TECHNICAL_DOCUMENTATION", + members=["cra_technical_documentation", "machine_risk_assessment"], + overlap_type=OverlapType.SIMILAR, + canonical_obligation_id="cra_technical_documentation", + explanation=( + "Beide Regime verlangen eine technische Dokumentation; Teile (Risikobetrachtung, " + "Konstruktionsunterlagen) lassen sich in einem konsolidierten technischen Dossier führen." + ), + ), + OverlapGroup( + overlap_group_id="CE_CONFORMITY", + members=["cra_ce_conformity_assessment", "machine_ce_conformity"], + overlap_type=OverlapType.COMPLEMENTARY, + canonical_obligation_id="machine_ce_conformity", + explanation=( + "Ein Produkt kann zwei CE-Regime gleichzeitig erfüllen müssen (MaschinenVO + CRA). " + "Eine gemeinsame CE-Kennzeichnung, aber getrennte Konformitätsbewertungen." + ), + ), + OverlapGroup( + overlap_group_id="INSTRUCTIONS_FOR_USE", + members=["cra_instructions_for_use", "machine_instructions_for_use"], + overlap_type=OverlapType.SIMILAR, + canonical_obligation_id="machine_instructions_for_use", + explanation=( + "Betriebsanleitung (MaschinenVO) und Sicherheitsinformationen (CRA) überschneiden sich; " + "ein integriertes Anleitungsdokument kann beide Pflichten bedienen." + ), + ), +] diff --git a/backend-compliance/compliance/reasoning/rules_regulations.py b/backend-compliance/compliance/reasoning/rules_regulations.py new file mode 100644 index 00000000..b6be8308 --- /dev/null +++ b/backend-compliance/compliance/reasoning/rules_regulations.py @@ -0,0 +1,160 @@ +"""Regulation-level applicability trigger rules (scope discovery, spec Modus 1). + +Each rule is pure data consumed by `scope_engine`. Triggers reference +`ProductProfile` fields through the safe predicate evaluator. `required_facts` +that are unknown turn the verdict *uncertain* and surface `fact_prompts`. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Dict, List, Optional + +from .enums import Confidence +from .predicates import Condition + +# Positive, human-readable label per profile fact (for trigger_facts output). +FIELD_LABELS: Dict[str, str] = { + "has_software": "Produkt enthält Software / digitale Elemente", + "has_embedded_software": "Produkt enthält eingebettete Software", + "has_remote_access": "Produkt besitzt Fernzugriff / Fernwartung", + "has_cloud_connection": "Produkt ist mit einer Cloud verbunden", + "has_radio_module": "Produkt enthält ein Funkmodul", + "has_safety_function": "Produkt erfüllt eine Sicherheitsfunktion", + "generates_usage_data": "Vernetztes Produkt erzeugt nutzbare Produktdaten", + "is_machine": "Produkt ist eine Maschine", + "is_component": "Produkt ist ein (Sicherheits-)Bauteil", + "eu_market": "Produkt wird auf dem EU-Markt bereitgestellt", + "is_essential_or_important_entity": "Unternehmen ist wesentliche/wichtige Einrichtung", + "manufacturer_role": "Wirtschaftsakteur-Rolle (Hersteller/Importeur/Händler)", +} + + +@dataclass(frozen=True) +class RegulationRule: + regulation_id: str + name: str + trigger: Condition + required_facts: List[str] + fact_prompts: Dict[str, str] + legal_basis_refs: List[str] + summary: str + confidence_when_applicable: Confidence = Confidence.HIGH + exclusion: Optional[Condition] = None + # Status is downgraded to PARTIALLY_APPLICABLE / MEDIUM when the trigger + # fires only via inference rather than a directly stated fact. + inferred: bool = False + excludable_roles: List[str] = field(default_factory=list) + + +_ECONOMIC_ROLES = ["manufacturer", "importer", "distributor"] + +REGULATION_RULES: List[RegulationRule] = [ + RegulationRule( + regulation_id="CRA", + name="Cyber Resilience Act (EU) 2024/2847", + trigger={ + "all": [ + {"any": [("has_software", "eq", True), ("has_embedded_software", "eq", True)]}, + ("eu_market", "eq", True), + ] + }, + required_facts=["has_software", "eu_market", "manufacturer_role"], + fact_prompts={ + "has_software": "Enthält das Produkt Software / digitale Elemente?", + "eu_market": "Wird das Produkt auf dem EU-Markt bereitgestellt oder in Verkehr gebracht?", + "manufacturer_role": "Welche Rolle nehmen Sie ein (Hersteller / Importeur / Händler)?", + }, + legal_basis_refs=["CRA Art. 2(1)", "CRA Art. 3(1)"], + summary="Produkte mit digitalen Elementen, die auf dem EU-Markt bereitgestellt werden.", + confidence_when_applicable=Confidence.HIGH, + excludable_roles=["operator"], + ), + RegulationRule( + regulation_id="MaschinenVO", + name="Maschinenverordnung (EU) 2023/1230", + trigger={ + "any": [ + ("is_machine", "eq", True), + {"all": [("is_component", "eq", True), ("has_safety_function", "eq", True)]}, + ] + }, + required_facts=["is_machine", "eu_market"], + fact_prompts={ + "is_machine": "Ist das Produkt eine Maschine oder ein Sicherheitsbauteil?", + "has_safety_function": "Erfüllt das Bauteil eine Sicherheitsfunktion?", + }, + legal_basis_refs=["MaschinenVO (EU) 2023/1230 Art. 2", "Anhang III"], + summary="Maschinen oder Sicherheitsbauteile, ggf. mit sicherheitsrelevanter Steuerung.", + confidence_when_applicable=Confidence.MEDIUM, + ), + RegulationRule( + regulation_id="RED", + name="Radio Equipment Directive 2014/53/EU", + trigger=("has_radio_module", "eq", True), + required_facts=["has_radio_module"], + fact_prompts={ + "has_radio_module": "Besitzt das Produkt ein Funkmodul (WLAN, Bluetooth, Mobilfunk)?", + }, + legal_basis_refs=["RED 2014/53/EU Art. 1", "Art. 3(3)(d-f)"], + summary="Funkanlagen; Art. 3(3) deckt zusätzlich Cybersecurity-Anforderungen ab.", + confidence_when_applicable=Confidence.HIGH, + ), + RegulationRule( + regulation_id="EMV", + name="EMV-Richtlinie 2014/30/EU", + trigger={ + "any": [ + ("has_software", "eq", True), + ("has_embedded_software", "eq", True), + ("has_radio_module", "eq", True), + ] + }, + required_facts=[], + fact_prompts={ + "is_electrical": "Ist das Produkt ein elektrisches / elektronisches Betriebsmittel?", + }, + legal_basis_refs=["EMV-RL 2014/30/EU Art. 2"], + summary="Elektrische/elektronische Betriebsmittel (hier aus den digitalen Elementen abgeleitet).", + confidence_when_applicable=Confidence.MEDIUM, + inferred=True, + ), + RegulationRule( + regulation_id="DataAct", + name="Data Act (EU) 2023/2854", + trigger={ + "all": [ + {"any": [("has_cloud_connection", "eq", True), ("has_remote_access", "eq", True)]}, + ("generates_usage_data", "eq", True), + ] + }, + required_facts=["generates_usage_data"], + fact_prompts={ + "generates_usage_data": "Erzeugt das vernetzte Produkt nutzbare Produkt-/Nutzungsdaten?", + }, + legal_basis_refs=["Data Act (EU) 2023/2854 Art. 2(5)", "Art. 3-5"], + summary="Vernetzte Produkte, die Nutzungsdaten erzeugen und zugänglich machen.", + confidence_when_applicable=Confidence.HIGH, + ), + RegulationRule( + regulation_id="NIS2", + name="NIS2-Richtlinie (EU) 2022/2555", + trigger=("is_essential_or_important_entity", "eq", True), + required_facts=["company_size", "sector", "is_essential_or_important_entity"], + fact_prompts={ + "company_size": "Unternehmensgröße (Mitarbeiterzahl / Umsatz)?", + "sector": "In welchem Sektor ist das Unternehmen tätig (Anhang I/II)?", + "is_essential_or_important_entity": "Fällt das Unternehmen als wesentliche/wichtige Einrichtung unter NIS2?", + }, + legal_basis_refs=["NIS2-RL (EU) 2022/2555 Art. 2", "Art. 3"], + summary="Adressiert die ORGANISATION (Größe/Sektor/Rolle), nicht das Produkt.", + confidence_when_applicable=Confidence.MEDIUM, + ), +] + + +def regulation_rule(regulation_id: str) -> Optional[RegulationRule]: + for rule in REGULATION_RULES: + if rule.regulation_id == regulation_id: + return rule + return None diff --git a/backend-compliance/compliance/reasoning/rules_types.py b/backend-compliance/compliance/reasoning/rules_types.py new file mode 100644 index 00000000..335da204 --- /dev/null +++ b/backend-compliance/compliance/reasoning/rules_types.py @@ -0,0 +1,58 @@ +"""Shared types for obligation scope rules. + +`required_evidence` MUST draw from the framework-AGNOSTIC evidence catalog +owned by the Compliance Execution Graph (memory `project_compliance_graph.md`, +User-Direktive 2026-06-25). Do not invent framework-specific evidence types. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import List, Optional + +from .enums import AuthorityLevel, Confidence +from .predicates import Condition + +# Framework-agnostic shared evidence catalog (the only allowed tokens). +EVIDENCE_CATALOG = frozenset( + { + "config_export", + "test_report", + "repo_scan", + "sbom", + "policy", + "audit_log", + "pentest", + "ticket", + } +) + + +@dataclass(frozen=True) +class ObligationRule: + obligation_id: str + title: str + source_regulation: str + obligation_text: str + legal_basis_refs: List[str] + authority_level: AuthorityLevel + family: str + applies_if: Condition + required_capabilities: List[str] + required_evidence: List[str] + base_confidence: Confidence = Confidence.HIGH + applies_unless: Optional[Condition] = None + lifecycle_phase: List[str] = field(default_factory=list) + applies_to_role: List[str] = field(default_factory=lambda: ["manufacturer", "importer"]) + overlap_group_id: Optional[str] = None + # True => obligation_id is owned by the Legal-KG registry (re-link, never re-mint). + registry_anchor: bool = False + # True => Machine/Data-Act obligation the registry has not canonicalised yet. + proposed: bool = False + + def __post_init__(self) -> None: + bad = [e for e in self.required_evidence if e not in EVIDENCE_CATALOG] + if bad: + raise ValueError( + "obligation %s uses non-catalog evidence %r" % (self.obligation_id, bad) + ) diff --git a/backend-compliance/compliance/reasoning/schemas.py b/backend-compliance/compliance/reasoning/schemas.py new file mode 100644 index 00000000..0d1bebfa --- /dev/null +++ b/backend-compliance/compliance/reasoning/schemas.py @@ -0,0 +1,216 @@ +"""Pydantic domain objects for the Regulatory Reasoning Engine. + +Trigger facts that drive scope are tri-state (`Optional[bool] = None`): `None` +means "fact unknown" and produces an *uncertain* verdict plus a concrete +missing-fact prompt — never silent false security (spec §6.3). +""" + +from __future__ import annotations + +from datetime import date +from typing import Dict, List, Optional + +from pydantic import BaseModel, Field + +from .enums import ( + ApplicabilityStatus, + AuthorityLevel, + Confidence, + CoverageStatus, + InterpretationVerdict, + ManufacturerRole, + MarketModel, + OverlapType, + ProductLifecyclePhase, +) + + +# --------------------------------------------------------------------------- +# Input +# --------------------------------------------------------------------------- +class ProductProfile(BaseModel): + """The customer's product / system. Tri-state booleans => unknown facts.""" + + product_name: str + product_profile_id: Optional[str] = None + manufacturer_role: Optional[ManufacturerRole] = None + product_type: List[str] = Field(default_factory=list) + + has_software: Optional[bool] = None + has_embedded_software: Optional[bool] = None + has_remote_access: Optional[bool] = None + has_cloud_connection: Optional[bool] = None + has_ai_functionality: Optional[bool] = None + has_radio_module: Optional[bool] = None + has_safety_function: Optional[bool] = None + generates_usage_data: Optional[bool] = None + + is_machine: Optional[bool] = None + is_component: Optional[bool] = None + is_spare_part: Optional[bool] = None + + placed_on_market_after: Optional[date] = None + intended_use: Optional[str] = None + eu_market: Optional[bool] = None + b2b_or_b2c: Optional[MarketModel] = None + lifecycle_phase: Optional[ProductLifecyclePhase] = None + + # Organisation context — only needed for NIS2 (not a product fact). + company_size: Optional[str] = None + sector: Optional[str] = None + is_essential_or_important_entity: Optional[bool] = None + + +# --------------------------------------------------------------------------- +# Scope +# --------------------------------------------------------------------------- +class ApplicableRegulation(BaseModel): + regulation_id: str + name: str + applicability_status: ApplicabilityStatus + trigger_facts: List[str] = Field(default_factory=list) + legal_basis_refs: List[str] = Field(default_factory=list) + confidence: Confidence + explanation: str + + +class ExcludedRegulation(BaseModel): + regulation_id: str + name: str + reason: str + + +class UncertainRegulation(BaseModel): + regulation_id: str + name: str + missing_facts: List[str] = Field(default_factory=list) + explanation: str + + +class RegulatoryScope(BaseModel): + product_profile_id: Optional[str] = None + applicable_regulations: List[ApplicableRegulation] = Field(default_factory=list) + excluded_regulations: List[ExcludedRegulation] = Field(default_factory=list) + uncertain_regulations: List[UncertainRegulation] = Field(default_factory=list) + missing_facts: List[str] = Field(default_factory=list) + confidence: Confidence = Confidence.MEDIUM + reasoning_summary: str = "" + + +# --------------------------------------------------------------------------- +# Obligations +# --------------------------------------------------------------------------- +class ApplicableObligation(BaseModel): + obligation_id: str + title: str + source_regulation: str + legal_basis_refs: List[str] = Field(default_factory=list) + obligation_text: str + authority_level: AuthorityLevel + applies_because: List[str] = Field(default_factory=list) + applies_to_role: List[str] = Field(default_factory=list) + lifecycle_phase: List[str] = Field(default_factory=list) + overlap_group_id: Optional[str] = None + required_evidence: List[str] = Field(default_factory=list) + confidence: Confidence + # True only when obligation_id is owned by the Legal-KG registry (CRA P1). + registry_anchor: bool = False + # Machine/Data-Act obligations the registry has not canonicalised yet. + proposed: bool = False + + +class ObligationOverlap(BaseModel): + overlap_group_id: str + obligations: List[str] = Field(default_factory=list) + overlap_type: OverlapType + canonical_obligation_id: str + explanation: str + + +# --------------------------------------------------------------------------- +# Customer claims & assessments +# --------------------------------------------------------------------------- +class CustomerImplementationClaim(BaseModel): + claim_id: str + raw_statement: str + normalized_claim: str = "" + claimed_capability: List[str] = Field(default_factory=list) + related_topics: List[str] = Field(default_factory=list) + qualifiers: List[str] = Field(default_factory=list) + evidence_refs: List[str] = Field(default_factory=list) + + +class ImplementationAssessment(BaseModel): + claim_id: str + obligation_id: str + coverage_status: CoverageStatus + missing_elements: List[str] = Field(default_factory=list) + required_evidence: List[str] = Field(default_factory=list) + explanation: str + confidence: Confidence + + +class InterpretationAssessment(BaseModel): + interpretation_id: str + raw_interpretation: str + affected_regulations: List[str] = Field(default_factory=list) + affected_obligations: List[str] = Field(default_factory=list) + assessment: InterpretationVerdict + risks: List[str] = Field(default_factory=list) + corrected_interpretation: str = "" + legal_basis_refs: List[str] = Field(default_factory=list) + explanation: str + confidence: Confidence + + +# --------------------------------------------------------------------------- +# API request / response envelopes +# --------------------------------------------------------------------------- +class ScopeRequest(BaseModel): + product_profile: ProductProfile + + +class ScopeResponse(BaseModel): + regulatory_scope: RegulatoryScope + missing_facts: List[str] = Field(default_factory=list) + confidence: Confidence + + +class ObligationsRequest(BaseModel): + product_profile: ProductProfile + regulatory_scope: Optional[RegulatoryScope] = None + + +class ObligationsResponse(BaseModel): + applicable_obligations: List[ApplicableObligation] = Field(default_factory=list) + overlaps: List[ObligationOverlap] = Field(default_factory=list) + excluded_obligations: List[str] = Field(default_factory=list) + evidence_for_multiple: Dict[str, List[str]] = Field(default_factory=dict) + + +class ImplementationRequest(BaseModel): + product_profile: ProductProfile + customer_claim: str + + +class ImplementationResponse(BaseModel): + claim: CustomerImplementationClaim + assessments: List[ImplementationAssessment] = Field(default_factory=list) + missing_evidence: List[str] = Field(default_factory=list) + summary: str = "" + + +class InterpretationRequest(BaseModel): + product_profile: Optional[ProductProfile] = None + customer_interpretation: str + + +class InterpretationResponse(BaseModel): + assessment: InterpretationVerdict + affected_regulations: List[str] = Field(default_factory=list) + affected_obligations: List[str] = Field(default_factory=list) + corrected_interpretation: str = "" + risks: List[str] = Field(default_factory=list) + legal_basis_refs: List[str] = Field(default_factory=list) + explanation: str = "" + confidence: Confidence = Confidence.MEDIUM diff --git a/backend-compliance/compliance/reasoning/scope_engine.py b/backend-compliance/compliance/reasoning/scope_engine.py new file mode 100644 index 00000000..d5df02a4 --- /dev/null +++ b/backend-compliance/compliance/reasoning/scope_engine.py @@ -0,0 +1,136 @@ +"""Scope discovery engine (spec Modus 1). + +Answers "which regulations apply to my product?" — and, crucially, never says +"X applies" without the triggers, and never hides a missing fact behind a false +verdict. Pure rule evaluation, deterministic. +""" + +from __future__ import annotations + +from typing import List, Optional + +from .enums import ApplicabilityStatus, Confidence +from .predicates import Condition, evaluate, true_leaves, unknown_fields +from .rules_regulations import REGULATION_RULES, FIELD_LABELS, RegulationRule +from .schemas import ( + ApplicableRegulation, + ExcludedRegulation, + ProductProfile, + RegulatoryScope, + UncertainRegulation, +) + +_DOWNGRADE = {Confidence.HIGH: Confidence.MEDIUM, Confidence.MEDIUM: Confidence.LOW, Confidence.LOW: Confidence.LOW} + + +def _fields_in(condition: Optional[Condition]) -> List[str]: + if condition is None: + return [] + if isinstance(condition, tuple): + return [condition[0]] + out: List[str] = [] + for c in condition.get("all") or condition.get("any") or []: + out.extend(_fields_in(c)) + return out + + +def _trigger_facts(rule: RegulationRule, profile: ProductProfile) -> List[str]: + labels: List[str] = [] + for leaf in true_leaves(rule.trigger, profile): + label = FIELD_LABELS.get(leaf[0]) + if label and label not in labels: + labels.append(label) + return labels + + +def _missing_prompts(rule: RegulationRule, profile: ProductProfile) -> List[str]: + fields = list(dict.fromkeys(rule.required_facts + _fields_in(rule.trigger))) + unknown = unknown_fields(fields, profile) + prompts: List[str] = [] + for f in unknown: + prompt = rule.fact_prompts.get(f) + if prompt and prompt not in prompts: + prompts.append(prompt) + return prompts + + +def discover_scope(profile: ProductProfile) -> RegulatoryScope: + scope = RegulatoryScope(product_profile_id=profile.product_profile_id) + + for rule in REGULATION_RULES: + role_value = profile.manufacturer_role.value if profile.manufacturer_role is not None else None + role_excluded = role_value is not None and role_value in rule.excludable_roles + trig = evaluate(rule.trigger, profile) + missing = _missing_prompts(rule, profile) + + if role_excluded: + scope.excluded_regulations.append( + ExcludedRegulation( + regulation_id=rule.regulation_id, + name=rule.name, + reason="Rolle '%s' ist von dieser Regulierung nicht unmittelbar adressiert." % role_value, + ) + ) + continue + + if trig is True: + conf = Confidence.MEDIUM if rule.inferred else rule.confidence_when_applicable + status = ( + ApplicabilityStatus.PARTIALLY_APPLICABLE if rule.inferred else ApplicabilityStatus.APPLICABLE + ) + unresolved = unknown_fields(rule.required_facts, profile) + if unresolved: + conf = _DOWNGRADE[conf] + for f in unresolved: + prompt = rule.fact_prompts.get(f) + if prompt and prompt not in scope.missing_facts: + scope.missing_facts.append(prompt) + scope.applicable_regulations.append( + ApplicableRegulation( + regulation_id=rule.regulation_id, + name=rule.name, + applicability_status=status, + trigger_facts=_trigger_facts(rule, profile), + legal_basis_refs=rule.legal_basis_refs, + confidence=conf, + explanation=rule.summary, + ) + ) + elif trig is None: + scope.uncertain_regulations.append( + UncertainRegulation( + regulation_id=rule.regulation_id, + name=rule.name, + missing_facts=missing, + explanation=rule.summary, + ) + ) + for m in missing: + if m not in scope.missing_facts: + scope.missing_facts.append(m) + else: # trig is False -> definitively excluded by a known fact + scope.excluded_regulations.append( + ExcludedRegulation( + regulation_id=rule.regulation_id, + name=rule.name, + reason="Auslösende Voraussetzungen sind anhand der bekannten Fakten nicht erfüllt.", + ) + ) + + scope.confidence = _overall_confidence(scope) + scope.reasoning_summary = _summary(scope) + return scope + + +def _overall_confidence(scope: RegulatoryScope) -> Confidence: + if scope.applicable_regulations and not scope.uncertain_regulations and not scope.missing_facts: + return Confidence.HIGH + if scope.applicable_regulations: + return Confidence.MEDIUM + return Confidence.LOW + + +def _summary(scope: RegulatoryScope) -> str: + applicable = ", ".join(r.regulation_id for r in scope.applicable_regulations) or "—" + uncertain = ", ".join(r.regulation_id for r in scope.uncertain_regulations) or "—" + return "Wahrscheinlich anwendbar: %s. Unsicher (fehlende Fakten): %s." % (applicable, uncertain) diff --git a/backend-compliance/compliance/reasoning/taxonomy_claims.py b/backend-compliance/compliance/reasoning/taxonomy_claims.py new file mode 100644 index 00000000..e4cc7064 --- /dev/null +++ b/backend-compliance/compliance/reasoning/taxonomy_claims.py @@ -0,0 +1,104 @@ +"""Deterministic taxonomy for normalising free-text customer claims. + +Capability names echo the planned Obligation -> Capability layer of the +Compliance Execution Graph (memory `project_compliance_graph.md`), so the +reasoning layer's claim capabilities line up with the registry's capabilities. +Matching is lowercase substring matching — deterministic, no LLM, no RAG. +""" + +from __future__ import annotations + +from typing import Dict, List + +# capability -> trigger substrings (German + English), matched lowercase. +CAPABILITY_KEYWORDS: Dict[str, List[str]] = { + "software_bill_of_materials": [ + "sbom", "stückliste", "stueckliste", "bill of materials", "komponentenliste", + ], + "secure_updates": ["update", "patch", "aktualisier", "release", "rollout"], + "software_integrity": ["signier", "signatur", "signed", "integrität", "integritaet", "hash"], + "vulnerability_management": [ + "schwachstelle", "vulnerab", "cve", "schwachstellenmanagement", "vuln", + ], + "coordinated_disclosure": [ + "disclosure", "offenlegung", "security.txt", "responsible disclosure", + ], + "incident_reporting": [ + "incident", "vorfall", "behörde", "behoerde", "csirt", "meldepflicht", "an die behörde", + ], + "authentication": [ + "authentifizier", "login", "passwort", "password", "mfa", "2fa", "anmeldung", + ], + "secure_by_default": [ + "härtung", "haertung", "hardening", "default", "standardkonfig", + "sichere konfiguration", "angriffsfläche", "angriffsflaeche", + ], + "security_logging": ["logging", "log ", "logs", "protokoll", "audit-trail", "ereignisprotokoll"], + "secure_communication": ["verschlüssel", "verschluessel", "encryption", "tls", "vpn", "ssl"], + "risk_assessment": [ + "risikoanalyse", "risikobeurteil", "risk assessment", "gefährdungsbeurteil", + "gefaehrdungsbeurteil", "bedrohungsanalyse", "threat model", + ], + "technical_documentation": [ + "dokumentation", "technische unterlagen", "betriebsanleitung", "handbuch", "documentation", + ], + "conformity_assessment": ["konformität", "konformitaet", "conformity", "baumuster", "ce-kenn"], + "functional_safety": [ + "performance level", "sil ", "iso 13849", "funktionale sicherheit", "safety control", + ], + "data_access_provision": [ + "datenzugang", "data access", "datenportabilität", "datenexport", "data export", + ], +} + +# capability -> broader compliance topics it touches (spec related_topics). +CAPABILITY_TOPICS: Dict[str, List[str]] = { + "software_bill_of_materials": ["component_transparency", "supply_chain", "vulnerability_management"], + "secure_updates": ["secure_updates", "vulnerability_remediation", "release_management"], + "software_integrity": ["secure_updates", "supply_chain", "tamper_protection"], + "vulnerability_management": ["vulnerability_handling", "monitoring", "patch_management"], + "coordinated_disclosure": ["vulnerability_handling", "transparency"], + "incident_reporting": ["incident_handling", "authority_notification"], + "authentication": ["access_control", "identity"], + "secure_by_default": ["hardening", "attack_surface", "configuration"], + "security_logging": ["monitoring", "forensics", "incident_handling"], + "secure_communication": ["confidentiality", "integrity", "remote_access"], + "risk_assessment": ["risk_management", "secure_by_design"], + "technical_documentation": ["documentation", "conformity"], + "conformity_assessment": ["conformity", "ce_marking"], + "functional_safety": ["machine_safety", "control_systems"], + "data_access_provision": ["data_sharing", "portability"], +} + +# qualifier -> substrings that signal a weak/incomplete implementation. +QUALIFIER_KEYWORDS: Dict[str, List[str]] = { + "reactive": [ + "wenn kunden", "wenn ein kunde", "nach meldung", "auf anfrage", "auf nachfrage", + "nur wenn", "reaktiv", "wenn fehler", "when customers", "on request", "when reported", + "ad hoc", "ad-hoc", "bei bedarf", + ], + "manual": ["manuell", "von hand", "manual", "händisch", "haendisch"], + "planned": [ + "geplant", "in planung", "wollen wir", "planen wir", "noch nicht", "zukünftig", "künftig", + ], + "absent": ["haben wir nicht", "gibt es nicht", "nicht vorhanden", "keinen prozess", "keine"], +} + + +def match_capabilities(text: str) -> List[str]: + low = text.lower() + return [cap for cap, kws in CAPABILITY_KEYWORDS.items() if any(k in low for k in kws)] + + +def match_qualifiers(text: str) -> List[str]: + low = text.lower() + return [q for q, kws in QUALIFIER_KEYWORDS.items() if any(k in low for k in kws)] + + +def topics_for(capabilities: List[str]) -> List[str]: + out: List[str] = [] + for cap in capabilities: + for t in CAPABILITY_TOPICS.get(cap, []): + if t not in out: + out.append(t) + return out diff --git a/backend-compliance/compliance/reasoning/taxonomy_interpretations.py b/backend-compliance/compliance/reasoning/taxonomy_interpretations.py new file mode 100644 index 00000000..10e20683 --- /dev/null +++ b/backend-compliance/compliance/reasoning/taxonomy_interpretations.py @@ -0,0 +1,159 @@ +"""Known customer interpretation patterns (spec Modus 4). + +Deterministic: a customer interpretation is matched by lowercase substring +triggers against a curated library of common misconceptions. No match -> +the engine returns `uncertain` and asks for the missing context (no false +security, spec §6.3). +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import List + +from .enums import Confidence, InterpretationVerdict + + +@dataclass(frozen=True) +class InterpretationPattern: + pattern_id: str + triggers: List[str] + verdict: InterpretationVerdict + corrected_interpretation: str + explanation: str + affected_regulations: List[str] = field(default_factory=list) + affected_obligations: List[str] = field(default_factory=list) + risks: List[str] = field(default_factory=list) + legal_basis_refs: List[str] = field(default_factory=list) + confidence: Confidence = Confidence.MEDIUM + + +INTERPRETATION_PATTERNS: List[InterpretationPattern] = [ + InterpretationPattern( + pattern_id="cra_only_new_products", + triggers=[ + "nur für neue", "nur fuer neue", "nur neu entwickelt", "nur neuentwicklung", + "nur bei neuentwicklung", "only new product", "gilt nur für neue produkte", + ], + verdict=InterpretationVerdict.TOO_NARROW, + corrected_interpretation=( + "CRA-Pflichten knüpfen primär an Produkt, Rolle, Marktzugang, Bereitstellung und " + "Übergangsfristen an, nicht nur an Neuentwicklung. Ein fertig entwickeltes " + "Katalogprodukt kann betroffen sein, wenn es nach dem maßgeblichen Zeitpunkt weiter " + "auf dem EU-Markt bereitgestellt wird." + ), + explanation=( + "Die relevante Frage ist nicht nur, ob das Produkt neu entwickelt wurde, sondern ob es " + "nach dem Anwendungszeitpunkt weiterhin bereitgestellt oder in Verkehr gebracht wird." + ), + affected_regulations=["CRA"], + risks=["Katalog-/Bestandsprodukt fällt trotz abgeschlossener Entwicklung unter den CRA."], + legal_basis_refs=["CRA Art. 2", "CRA Art. 69 (Übergangsbestimmungen)"], + confidence=Confidence.HIGH, + ), + InterpretationPattern( + pattern_id="cra_b2b_exempt", + triggers=[ + "gilt nicht für b2b", "nur für verbraucher", "nur b2c", "nicht im b2b", + "only consumer", "b2b ist ausgenommen", + ], + verdict=InterpretationVerdict.TOO_NARROW, + corrected_interpretation=( + "Der CRA gilt produkt- und marktbezogen, unabhängig von B2B oder B2C. Eine generelle " + "B2B-Ausnahme existiert nicht; Industrieprodukte mit digitalen Elementen sind erfasst." + ), + explanation="Der Anwendungsbereich knüpft an 'Produkte mit digitalen Elementen' an, nicht an die Kundengruppe.", + affected_regulations=["CRA"], + risks=["Industrielle B2B-Steuerungen werden fälschlich als ausgenommen behandelt."], + legal_basis_refs=["CRA Art. 2", "CRA Art. 3(1)"], + confidence=Confidence.HIGH, + ), + InterpretationPattern( + pattern_id="sbom_is_enough", + triggers=[ + "sbom reicht", "mit sbom sind wir", "sbom genügt", "sbom genuegt", "nur eine sbom", + "sbom allein", + ], + verdict=InterpretationVerdict.TOO_NARROW, + corrected_interpretation=( + "Eine SBOM erfüllt nur einen Teil der Komponenten-Transparenz. Schwachstellen-" + "überwachung, Update-/Patch-Prozess und technische Dokumentation bleiben eigenständige Pflichten." + ), + explanation="SBOM ist Voraussetzung, ersetzt aber nicht Vulnerability-Handling und Updates.", + affected_regulations=["CRA"], + affected_obligations=["sbom_creation", "vuln_handling_process", "provide_security_updates"], + risks=["Falsche Annahme vollständiger Erfüllung trotz fehlendem Vulnerability-Prozess."], + legal_basis_refs=["CRA Annex I Part II (1)", "CRA Annex I Part II (2)"], + confidence=Confidence.HIGH, + ), + InterpretationPattern( + pattern_id="open_source_exempt", + triggers=[ + "open source ist ausgenommen", "open-source ist ausgenommen", "oss ist ausgenommen", + "freie software ist ausgenommen", "open source fällt nicht", + ], + verdict=InterpretationVerdict.PARTIALLY_CORRECT, + corrected_interpretation=( + "Nur nicht-kommerziell bereitgestellte Open-Source-Software ist ausgenommen. Sobald OSS " + "kommerziell in ein Produkt integriert und auf dem Markt bereitgestellt wird, greift der CRA." + ), + explanation="Die Ausnahme zielt auf nicht-kommerzielle OSS-Bereitstellung, nicht auf kommerzielle Produktintegration.", + affected_regulations=["CRA"], + risks=["Kommerziell integrierte OSS-Komponenten werden fälschlich als ausgenommen behandelt."], + legal_basis_refs=["CRA Art. 2", "CRA Erwägungsgründe (Open-Source-Stewards)"], + confidence=Confidence.MEDIUM, + ), + InterpretationPattern( + pattern_id="reactive_updates_ok", + triggers=[ + "updates nur wenn", "reaktive updates reichen", "wenn kunden melden reicht", + "updates wenn fehler gemeldet", + ], + verdict=InterpretationVerdict.TOO_NARROW, + corrected_interpretation=( + "Der CRA verlangt aktive Schwachstellenüberwachung und zeitnahe Sicherheitsupdates über " + "den Supportzeitraum, nicht nur reaktive Updates nach Kundenmeldung." + ), + explanation="Ein rein reaktiver Updateprozess erfüllt die Pflicht zur aktiven Schwachstellenbehandlung nicht.", + affected_regulations=["CRA"], + affected_obligations=["provide_security_updates", "vuln_handling_process"], + risks=["Verzögerte Reaktion auf öffentlich bekannte Schwachstellen; Pflichtverletzung."], + legal_basis_refs=["CRA Annex I Part II (1)", "CRA Annex I (2)(c)"], + confidence=Confidence.HIGH, + ), + InterpretationPattern( + pattern_id="machinery_covers_cyber", + triggers=[ + "maschinenrichtlinie deckt cyber", "maschinenvo deckt alles", "ce der maschine reicht", + "ce maschine reicht für cyber", "maschinen-ce reicht", + ], + verdict=InterpretationVerdict.PARTIALLY_CORRECT, + corrected_interpretation=( + "Die MaschinenVO deckt die sicherheitsrelevante Korrumpierung ab (Anhang III 1.1.9), " + "ersetzt aber nicht die produktbezogenen CRA-Security-Pflichten. Beide Regime gelten parallel." + ), + explanation="Maschinen-CE und CRA überschneiden sich nur dort, wo Cyber eine Sicherheitsfunktion betrifft.", + affected_regulations=["CRA", "MaschinenVO"], + affected_obligations=["machine_protection_against_corruption", "vuln_handling_process"], + risks=["CRA-Pflichten werden übersehen, weil die Maschine bereits CE-gekennzeichnet ist."], + legal_basis_refs=["MaschinenVO Anhang III (1.1.9)", "CRA Art. 13"], + confidence=Confidence.MEDIUM, + ), + InterpretationPattern( + pattern_id="no_radio_no_cyber", + triggers=[ + "ohne funkmodul kein cyber", "kein funk also kein cra", "ohne funk keine security", + "ohne funkmodul keine cyber", + ], + verdict=InterpretationVerdict.TOO_NARROW, + corrected_interpretation=( + "Der CRA knüpft an digitale Elemente an, nicht an ein Funkmodul. Ohne Funk entfällt die " + "RED, der CRA bleibt jedoch anwendbar, sobald Software vorhanden ist." + ), + explanation="Funkmodul ist nur für die RED relevant; die CRA-Anwendbarkeit folgt aus der Software.", + affected_regulations=["CRA", "RED"], + risks=["CRA wird fälschlich verneint, weil kein Funkmodul vorhanden ist."], + legal_basis_refs=["CRA Art. 3(1)", "RED 2014/53/EU Art. 1"], + confidence=Confidence.HIGH, + ), +] diff --git a/backend-compliance/tests/test_reasoning_engine.py b/backend-compliance/tests/test_reasoning_engine.py new file mode 100644 index 00000000..ca06dbb2 --- /dev/null +++ b/backend-compliance/tests/test_reasoning_engine.py @@ -0,0 +1,264 @@ +"""Tests for the Regulatory Reasoning Engine. + +Covers the five typical machine-builder scenarios and the ten acceptance +questions from the build spec (§15). Engine tests are pure (no DB); the +endpoint smoke tests mount only the reasoning router. +""" + +from __future__ import annotations + +from datetime import date + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from compliance.reasoning import ( + assess_implementation, + assess_interpretation, + derive_obligations, + discover_scope, + normalize_claim, +) +from compliance.reasoning.enums import ( + ApplicabilityStatus, + CoverageStatus, + InterpretationVerdict, +) +from compliance.reasoning.schemas import ProductProfile +from compliance.reasoning.enums import ManufacturerRole + + +# --------------------------------------------------------------------------- +# Fixtures / builders +# --------------------------------------------------------------------------- +def sps_profile(**overrides) -> ProductProfile: + base = dict( + product_name="SPS mit HMI", + product_type=["SPS", "HMI", "Schaltschrank"], + has_software=True, + has_remote_access=True, + has_cloud_connection=True, + eu_market=True, + manufacturer_role=ManufacturerRole.MANUFACTURER, + ) + base.update(overrides) + return ProductProfile(**base) + + +def _reg_ids(scope, attr): + return [getattr(r, "regulation_id") for r in getattr(scope, attr)] + + +# --------------------------------------------------------------------------- +# 1. Gilt CRA für eine SPS mit Fernwartung? +# --------------------------------------------------------------------------- +def test_cra_applies_to_sps_with_remote_access(): + scope = discover_scope(sps_profile()) + cra = [r for r in scope.applicable_regulations if r.regulation_id == "CRA"] + assert cra and cra[0].applicability_status == ApplicabilityStatus.APPLICABLE + assert cra[0].confidence.value == "high" + assert any("digitale Elemente" in f or "Fernzugriff" in f for f in cra[0].trigger_facts) or cra[0].trigger_facts + + +# --------------------------------------------------------------------------- +# 2. Katalogprodukt 2027 weiter verkauft -> CRA gilt; "nur neue Produkte" zu eng +# --------------------------------------------------------------------------- +def test_cra_applies_to_finished_catalog_product(): + profile = sps_profile(placed_on_market_after=date(2027, 1, 1), lifecycle_phase="placing_on_market") + scope = discover_scope(profile) + assert "CRA" in _reg_ids(scope, "applicable_regulations") + + +def test_interpretation_only_new_products_is_too_narrow(): + result = assess_interpretation("Wir glauben, der CRA gilt nur für neue Produkte.") + assert result.assessment == InterpretationVerdict.TOO_NARROW + assert "CRA" in result.affected_regulations + assert result.corrected_interpretation + assert result.legal_basis_refs + + +# --------------------------------------------------------------------------- +# 3. Reicht eine SBOM allein? -> nein, nur teilweise +# --------------------------------------------------------------------------- +def test_sbom_alone_is_not_enough(): + resp = assess_implementation(sps_profile(), "Wir haben SBOMs.") + sbom = [a for a in resp.assessments if a.obligation_id == "sbom_creation"] + assert sbom and sbom[0].coverage_status == CoverageStatus.COVERED + # but other obligations are surfaced as gaps -> aggregate not fully covered + assert any(a.coverage_status != CoverageStatus.COVERED for a in resp.assessments) + assert "Teilweise erfüllt" in resp.summary or "offen" in resp.summary + + +# --------------------------------------------------------------------------- +# 4. Ist ein reaktiver Updateprozess ausreichend? -> nur teilweise +# --------------------------------------------------------------------------- +def test_reactive_update_process_is_partial(): + resp = assess_implementation( + sps_profile(), "Wir machen Updates, wenn Kunden Fehler melden." + ) + upd = [a for a in resp.assessments if a.obligation_id == "provide_security_updates"] + assert upd and upd[0].coverage_status == CoverageStatus.PARTIALLY_COVERED + assert "reactive" in resp.claim.qualifiers + assert any("Schwachstellenüberwachung" in m for m in upd[0].missing_elements) + + +# --------------------------------------------------------------------------- +# 5. Wann überschneiden sich CRA und MaschinenVO? +# --------------------------------------------------------------------------- +def test_cra_and_machinery_overlap_on_cyber_safety(): + profile = sps_profile(is_machine=True, has_safety_function=True) + resp = derive_obligations(profile) + ids = [o.obligation_id for o in resp.applicable_obligations] + assert "machine_protection_against_corruption" in ids + assert "vuln_handling_process" in ids + vuln_overlap = [o for o in resp.overlaps if o.overlap_group_id == "VULNERABILITY_HANDLING"] + assert vuln_overlap + assert "machine_protection_against_corruption" in vuln_overlap[0].obligations + + +# --------------------------------------------------------------------------- +# 6. Wann ist Data Act zusätzlich relevant? +# --------------------------------------------------------------------------- +def test_data_act_relevant_when_product_generates_data(): + scope = discover_scope(sps_profile(generates_usage_data=True)) + assert "DataAct" in _reg_ids(scope, "applicable_regulations") + obs = derive_obligations(sps_profile(generates_usage_data=True)) + assert any(o.source_regulation == "DataAct" for o in obs.applicable_obligations) + + +def test_data_act_uncertain_when_data_unknown(): + scope = discover_scope(sps_profile()) # generates_usage_data=None + assert "DataAct" in _reg_ids(scope, "uncertain_regulations") + + +# --------------------------------------------------------------------------- +# 7. Welche Pflichten gelten nicht ohne Funkmodul? +# --------------------------------------------------------------------------- +def test_no_radio_module_excludes_red(): + scope = discover_scope(sps_profile(has_radio_module=False)) + assert "RED" in _reg_ids(scope, "excluded_regulations") + assert "RED" not in _reg_ids(scope, "applicable_regulations") + + +def test_radio_unknown_makes_red_uncertain(): + scope = discover_scope(sps_profile()) # has_radio_module=None + assert "RED" in _reg_ids(scope, "uncertain_regulations") + + +# --------------------------------------------------------------------------- +# 8. Welche Fakten fehlen für eine NIS2-Bewertung? +# --------------------------------------------------------------------------- +def test_nis2_missing_facts(): + scope = discover_scope(sps_profile()) + nis2 = [r for r in scope.uncertain_regulations if r.regulation_id == "NIS2"] + assert nis2 + joined = " ".join(nis2[0].missing_facts).lower() + assert "unternehmensgröße" in joined and "sektor" in joined + + +# --------------------------------------------------------------------------- +# 9. Welche Nachweise decken mehrere Pflichten gleichzeitig? (USP) +# --------------------------------------------------------------------------- +def test_evidence_covers_multiple_obligations(): + resp = derive_obligations(sps_profile()) + multi = resp.evidence_for_multiple + assert multi # at least one evidence type spans >1 obligation + assert all(len(ids) > 1 for ids in multi.values()) + assert "policy" in multi # the CRA process docs share a policy evidence + + +# --------------------------------------------------------------------------- +# 10. Auslegungen: zu eng / zu weit / plausibel / unbekannt +# --------------------------------------------------------------------------- +def test_interpretation_unknown_returns_uncertain(): + result = assess_interpretation("Der Mond beeinflusst unsere Updatezyklen.") + assert result.assessment == InterpretationVerdict.UNCERTAIN + assert result.corrected_interpretation + + +def test_interpretation_open_source_partially_correct(): + result = assess_interpretation("Open Source ist ausgenommen, also betrifft uns der CRA nicht.") + assert result.assessment == InterpretationVerdict.PARTIALLY_CORRECT + + +# --------------------------------------------------------------------------- +# Registry-alignment + contract guards +# --------------------------------------------------------------------------- +def test_cra_obligations_reuse_registry_ids_not_minted(): + resp = derive_obligations(sps_profile()) + anchored = [o for o in resp.applicable_obligations if o.registry_anchor] + assert "sbom_creation" in [o.obligation_id for o in anchored] + assert "provide_security_updates" in [o.obligation_id for o in anchored] + # machine obligations are proposed, never claimed as registry-owned + machine = [o for o in resp.applicable_obligations if o.source_regulation == "MaschinenVO"] + assert all(o.proposed and not o.registry_anchor for o in machine) + + +def test_required_evidence_only_uses_shared_catalog(): + from compliance.reasoning.rules_types import EVIDENCE_CATALOG + from compliance.reasoning.rules_obligations import ALL_OBLIGATIONS + + for rule in ALL_OBLIGATIONS: + assert set(rule.required_evidence) <= EVIDENCE_CATALOG + + +def test_claim_normalizer_is_deterministic(): + a = normalize_claim("Wir haben einen Update-Prozess.") + b = normalize_claim("Wir haben einen Update-Prozess.") + assert a.claim_id == b.claim_id + assert "secure_updates" in a.claimed_capability + + +def test_unspecific_claim_asks_for_detail(): + resp = assess_implementation(sps_profile(), "Wir sind sicher aufgestellt.") + assert resp.assessments == [] or all( + a.coverage_status == CoverageStatus.UNCLEAR for a in resp.assessments + ) + assert "unspezifisch" in resp.summary.lower() + + +# --------------------------------------------------------------------------- +# Endpoint smoke tests +# --------------------------------------------------------------------------- +@pytest.fixture(scope="module") +def client(): + from compliance.api.reasoning_routes import router + + app = FastAPI() + app.include_router(router) + return TestClient(app) + + +def test_endpoint_scope(client): + r = client.post("/reasoning/scope", json={"product_profile": {"product_name": "X", "has_software": True, "eu_market": True, "manufacturer_role": "manufacturer"}}) + assert r.status_code == 200 + body = r.json() + assert "CRA" in [x["regulation_id"] for x in body["regulatory_scope"]["applicable_regulations"]] + + +def test_endpoint_obligations(client): + r = client.post( + "/reasoning/obligations", + json={"product_profile": {"product_name": "X", "has_software": True, "has_remote_access": True, "eu_market": True, "manufacturer_role": "manufacturer"}}, + ) + assert r.status_code == 200 + assert r.json()["applicable_obligations"] + + +def test_endpoint_implementation(client): + r = client.post( + "/reasoning/implementation-assessment", + json={"product_profile": {"product_name": "X", "has_software": True, "eu_market": True, "manufacturer_role": "manufacturer"}, "customer_claim": "Wir haben SBOMs."}, + ) + assert r.status_code == 200 + assert r.json()["assessments"] + + +def test_endpoint_interpretation(client): + r = client.post( + "/reasoning/interpretation-assessment", + json={"customer_interpretation": "CRA gilt nur für neue Produkte."}, + ) + assert r.status_code == 200 + assert r.json()["assessment"] == "too_narrow"