From 77de7e794c31639da185c0abefe05dcf8a37546e Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Sat, 27 Jun 2026 07:31:11 +0200 Subject: [PATCH] =?UTF-8?q?feat(transition):=20Transition=20Reasoning=20v0?= =?UTF-8?q?=20(RS-005)=20=E2=80=94=20Transition=20Planning=20Engine?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Second reasoning mode, scope per user: the engine owns the INFORMATION GAPS, not the questions. assess_transition(context, target_requirements, company_profile) emits ranked TransitionQuestionRequest {capability, control, reason, question_intent, expected_evidence, priority, information_gain} -- NOT rendered question text. Rendering (intent+subject->sentence) is a separate swappable layer (RS-005.1), not here. Consumes the Company Capability Profile (2A) as "have" + injected TargetRequirement (Execution-owned placeholder) as "required" -- no required-capability data in product code (EMPTY_REQUIREMENTS, mocks only in tests). A certification-derived capability is probably_covered (Welt 1) -> a confirmation request, never already_covered/"erfuellt". Deterministic, computed-not-stored, no percentages. Activates 2A/2C/RCI (first consumer of the Company profile). Freeze-respecting: additive package, no new graph/base class/meta-model class. 9 tests, mypy --strict clean, LOC ok. No endpoint/UI/RAG; question rendering deliberately deferred to RS-005.1. Co-Authored-By: Claude Opus 4.7 --- .../transition_reasoning/__init__.py | 42 ++++++ .../compliance/transition_reasoning/engine.py | 138 ++++++++++++++++++ .../transition_reasoning/schemas.py | 110 ++++++++++++++ .../tests/test_transition_reasoning.py | 133 +++++++++++++++++ 4 files changed, 423 insertions(+) create mode 100644 backend-compliance/compliance/transition_reasoning/__init__.py create mode 100644 backend-compliance/compliance/transition_reasoning/engine.py create mode 100644 backend-compliance/compliance/transition_reasoning/schemas.py create mode 100644 backend-compliance/tests/test_transition_reasoning.py diff --git a/backend-compliance/compliance/transition_reasoning/__init__.py b/backend-compliance/compliance/transition_reasoning/__init__.py new file mode 100644 index 00000000..67361967 --- /dev/null +++ b/backend-compliance/compliance/transition_reasoning/__init__.py @@ -0,0 +1,42 @@ +"""Transition Reasoning v0 (RS-005) — the Transition Planning Engine. + +Answers „Was muss ich noch wissen, um vom Ausgangs- in den regulatorischen +Zielzustand zu kommen?". Owns the **information gaps** (`TransitionQuestionRequest`), +NOT the rendered questions (rendering = separate RS-005.1 layer). + +Consumes the Company Capability Profile (2A) as „have" + injected `TargetRequirement` +(Execution-owned placeholder) as „required". Spec: docs-src/architecture/transition-reasoning-spec-v1.md. +""" + +from __future__ import annotations + +from .engine import EMPTY_REQUIREMENTS, assess_transition +from .schemas import ( + CapabilityCoverage, + CoverageStatus, + InformationGain, + RequestPriority, + TargetRequirement, + TargetType, + TransitionAssessment, + TransitionContext, + TransitionGoal, + TransitionQuestionRequest, + TransitionSummary, +) + +__all__ = [ + "assess_transition", + "EMPTY_REQUIREMENTS", + "TransitionContext", + "TransitionGoal", + "TargetType", + "TargetRequirement", + "TransitionQuestionRequest", + "CapabilityCoverage", + "CoverageStatus", + "RequestPriority", + "InformationGain", + "TransitionSummary", + "TransitionAssessment", +] diff --git a/backend-compliance/compliance/transition_reasoning/engine.py b/backend-compliance/compliance/transition_reasoning/engine.py new file mode 100644 index 00000000..7160b21e --- /dev/null +++ b/backend-compliance/compliance/transition_reasoning/engine.py @@ -0,0 +1,138 @@ +"""Transition Reasoning v0 (RS-005) — the Transition Planning Engine. + +`assess_transition(context, target_requirements, company_profile)`: computes, per +required capability, the coverage from the company's „have" state (Phase 2A), and +emits a ranked list of `TransitionQuestionRequest` (information gaps) — NOT questions. + +Deterministic; nothing is stored. A certification-derived „probably covered" is Welt 1 +(a hint), so it produces a confirmation request, never „erfüllt". Python 3.9 compatible. +""" + +from __future__ import annotations + +from typing import Dict, List, Optional + +from compliance.company import CompanyCapabilityProfile, VerificationStatus + +from .schemas import ( + CapabilityCoverage, + CoverageStatus, + InformationGain, + RequestPriority, + TargetRequirement, + TransitionAssessment, + TransitionContext, + TransitionQuestionRequest, + TransitionSummary, +) + +EMPTY_REQUIREMENTS: List[TargetRequirement] = [] + +_STATUS_RANK = { # strongest „have" signal wins when a capability appears twice + VerificationStatus.CONFIRMED: 3, + VerificationStatus.INFERRED: 2, + VerificationStatus.DECLARED: 1, + VerificationStatus.UNKNOWN: 0, +} + + +def _have(profile: CompanyCapabilityProfile) -> Dict[str, VerificationStatus]: + out: Dict[str, VerificationStatus] = {} + for oc in profile.confirmed_capabilities: + out[oc.capability_id] = VerificationStatus.CONFIRMED + for c in profile.candidate_capabilities: + cur = out.get(c.capability_id) + if cur is None or _STATUS_RANK[c.verification_status] > _STATUS_RANK[cur]: + out[c.capability_id] = c.verification_status + return out + + +def _classify(req: TargetRequirement, have: Dict[str, VerificationStatus]) -> CoverageStatus: + if req.unsupported: + return CoverageStatus.UNSUPPORTED + status = have.get(req.capability_id) + if status == VerificationStatus.CONFIRMED: + return CoverageStatus.ALREADY_COVERED + if status == VerificationStatus.INFERRED: + return CoverageStatus.PROBABLY_COVERED + if status == VerificationStatus.DECLARED: + return CoverageStatus.NEEDS_CONFIRMATION + return CoverageStatus.MISSING + + +# coverage -> (request?, reason, base priority) +_REQUESTABLE = { + CoverageStatus.PROBABLY_COVERED: ("Vermutlich vorhanden (aus Zertifizierung) — mit Nachweis bestätigen.", RequestPriority.MEDIUM), + CoverageStatus.NEEDS_CONFIRMATION: ("Selbst angegeben — Nachweis steht aus.", RequestPriority.MEDIUM), + CoverageStatus.MISSING: ("Keine Anhaltspunkte im Unternehmensprofil — klären.", RequestPriority.HIGH), +} + + +def _gain(coverage: CoverageStatus, n_obligations: int) -> InformationGain: + base = InformationGain.HIGH if coverage == CoverageStatus.MISSING else ( + InformationGain.MEDIUM if coverage == CoverageStatus.NEEDS_CONFIRMATION else InformationGain.LOW + ) + if n_obligations >= 2 and base != InformationGain.HIGH: # more dependent obligations -> bump + return InformationGain.HIGH if base == InformationGain.MEDIUM else InformationGain.MEDIUM + return base + + +_PRIO_ORDER = {RequestPriority.HIGH: 0, RequestPriority.MEDIUM: 1, RequestPriority.LOW: 2} +_GAIN_ORDER = {InformationGain.HIGH: 0, InformationGain.MEDIUM: 1, InformationGain.LOW: 2} + + +def assess_transition( + context: TransitionContext, + target_requirements: Optional[List[TargetRequirement]] = None, + company_profile: Optional[CompanyCapabilityProfile] = None, +) -> TransitionAssessment: + reqs = EMPTY_REQUIREMENTS if target_requirements is None else target_requirements + have = _have(company_profile) if company_profile is not None else {} + + coverage: List[CapabilityCoverage] = [] + requests: List[TransitionQuestionRequest] = [] + buckets: Dict[CoverageStatus, List[str]] = {s: [] for s in CoverageStatus} + + for req in reqs: + status = _classify(req, have) + coverage.append( + CapabilityCoverage( + capability_id=req.capability_id, + status=status, + have_status=have[req.capability_id].value if req.capability_id in have else None, + ) + ) + buckets[status].append(req.capability_id) + if status in _REQUESTABLE: + reason, prio = _REQUESTABLE[status] + requests.append( + TransitionQuestionRequest( + capability_id=req.capability_id, + control_id=req.source_control_id, + reason=reason, + question_intent=req.question_intent, + expected_evidence=req.expected_evidence, + priority=prio, + information_gain=_gain(status, len(req.supports_obligations)), + ) + ) + + requests.sort(key=lambda r: (_PRIO_ORDER[r.priority], _GAIN_ORDER[r.information_gain], r.capability_id)) + + summary = TransitionSummary( + headline=( + "%d zu klären, %d bereits abgedeckt, %d vermutlich vorhanden, %d fehlt, %d n/a, %d nicht im Korpus." + % (len(requests), len(buckets[CoverageStatus.ALREADY_COVERED]), + len(buckets[CoverageStatus.PROBABLY_COVERED]), len(buckets[CoverageStatus.MISSING]), + len(buckets[CoverageStatus.NOT_APPLICABLE]), len(buckets[CoverageStatus.UNSUPPORTED])) + ), + what_to_clarify=[r.capability_id for r in requests], + already_covered=buckets[CoverageStatus.ALREADY_COVERED], + probably_covered=buckets[CoverageStatus.PROBABLY_COVERED], + missing=buckets[CoverageStatus.MISSING], + not_applicable=buckets[CoverageStatus.NOT_APPLICABLE], + unsupported=buckets[CoverageStatus.UNSUPPORTED], + ) + return TransitionAssessment( + target_id=context.target.target_id, coverage=coverage, question_requests=requests, summary=summary + ) diff --git a/backend-compliance/compliance/transition_reasoning/schemas.py b/backend-compliance/compliance/transition_reasoning/schemas.py new file mode 100644 index 00000000..c4618292 --- /dev/null +++ b/backend-compliance/compliance/transition_reasoning/schemas.py @@ -0,0 +1,110 @@ +"""Transition Reasoning v0 (RS-005) — domain objects. + +The **Transition Planning Engine**: it answers „Was muss ich noch wissen, um vom +Ausgangszustand in den regulatorischen Zielzustand zu kommen?" — NOT „wie frage ich +das?". It therefore owns the **information gaps** (`TransitionQuestionRequest`), never +the rendered question text. Rendering (intent + subject -> sentence) is a separate, +swappable layer (RS-005.1 Question Generator) and is NOT part of this engine. + +v0 consumes the Company Capability Profile (Phase 2A) as the „have" state and an +INJECTED list of `TargetRequirement` as the Execution-owned „required" side (no +required-capability data in product code — same discipline as 2A's EMPTY_MAPPING). + +Welt-1-Grenze: a probable coverage (from a certification) is a hint, never „erfüllt"; +it produces a confirmation request, not a verdict. Application/reasoning types, NOT +meta-model classes (freeze v1.0 untouched). Python 3.9 compatible (no `|` unions). +""" + +from __future__ import annotations + +from enum import Enum +from typing import List, Optional + +from pydantic import BaseModel, Field + + +class TargetType(str, Enum): + REGULATION = "regulation" + CERTIFICATION = "certification" + FRAMEWORK = "framework" + + +class CoverageStatus(str, Enum): + ALREADY_COVERED = "already_covered" # confirmed in the company profile + PROBABLY_COVERED = "probably_covered" # inferred (e.g. from a certification) + NEEDS_CONFIRMATION = "needs_confirmation" # only declared / weak signal + MISSING = "missing" # no signal + NOT_APPLICABLE = "not_applicable" + UNSUPPORTED = "unsupported" # domain not yet in the corpus (future_corpus_needed) + + +class RequestPriority(str, Enum): + HIGH = "high" + MEDIUM = "medium" + LOW = "low" + + +class InformationGain(str, Enum): + HIGH = "high" + MEDIUM = "medium" + LOW = "low" + + +class TransitionGoal(BaseModel): + target_id: str # e.g. "CRA", "TISAX" + target_type: TargetType = TargetType.REGULATION + label: str = "" + + +class TransitionContext(BaseModel): + company_id: str = "" + known_certifications: List[str] = Field(default_factory=list) + known_regulations: List[str] = Field(default_factory=list) + target: TransitionGoal + + +# ── INJECTED (Execution-owned): what the target requires ─────────────────── +class TargetRequirement(BaseModel): + """One required capability for the target. In v0 injected; later resolved from + `Obligation -> Control -> Required Capability` + `Control -> question_intent`.""" + + capability_id: str # MCAP-... + question_intent: str = "verify_existence" # passed through to the request, not rendered + expected_evidence: List[str] = Field(default_factory=list) + source_control_id: Optional[str] = None + supports_obligations: List[str] = Field(default_factory=list) + unsupported: bool = False # domain not yet in the corpus + + +# ── the OWNED output: an information gap, NOT a question ─────────────────── +class TransitionQuestionRequest(BaseModel): + capability_id: str + control_id: Optional[str] = None + reason: str + question_intent: str # verify_existence / determine_duration / ... (rendered later) + expected_evidence: List[str] = Field(default_factory=list) + priority: RequestPriority + information_gain: InformationGain + + +class CapabilityCoverage(BaseModel): + capability_id: str + status: CoverageStatus + have_status: Optional[str] = None # the 2A VerificationStatus that drove it + + +class TransitionSummary(BaseModel): + headline: str = "" # counts, NO percentage + what_to_clarify: List[str] = Field(default_factory=list) # capability_ids with a request + already_covered: List[str] = Field(default_factory=list) + probably_covered: List[str] = Field(default_factory=list) + missing: List[str] = Field(default_factory=list) + not_applicable: List[str] = Field(default_factory=list) + unsupported: List[str] = Field(default_factory=list) + + +class TransitionAssessment(BaseModel): + target_id: str + coverage: List[CapabilityCoverage] = Field(default_factory=list) + question_requests: List[TransitionQuestionRequest] = Field(default_factory=list) # ranked + summary: TransitionSummary diff --git a/backend-compliance/tests/test_transition_reasoning.py b/backend-compliance/tests/test_transition_reasoning.py new file mode 100644 index 00000000..eaf9965a --- /dev/null +++ b/backend-compliance/tests/test_transition_reasoning.py @@ -0,0 +1,133 @@ +"""Tests for Transition Reasoning v0 (RS-005) — the Transition Planning Engine. + +Acceptance: from a TransitionGoal + the Company Capability Profile (2A, „have") + +INJECTED TargetRequirements (Execution-owned „required"), the engine emits ranked +`TransitionQuestionRequest`s (information gaps) — NOT rendered questions. A +certification-derived capability is „probably_covered" (Welt 1), never „already_covered". + +The cert->capability mapping below is a MOCK (Execution-owned in reality), only here. +""" + +from __future__ import annotations + +from compliance.company import ( + CapabilityMappingEntry, Certification, CompanyContext, Declaration, + ExistingEvidence, build_company_profile, +) +from compliance.reasoning.enums import Confidence +from compliance.transition_reasoning import ( + CoverageStatus, InformationGain, RequestPriority, TargetRequirement, TargetType, + TransitionContext, TransitionGoal, assess_transition, +) + +ISO_MAP = {"ISO27001": CapabilityMappingEntry( + capability_ids=["cap_incident_response", "cap_supplier_management"], confidence=Confidence.MEDIUM)} + + +def _profile(): + ctx = CompanyContext( + company_id="kunde", + certifications=[Certification(certification_id="ISO27001")], + declarations=[Declaration(capability_id="cap_asset_management")], + evidence=[ExistingEvidence(evidence_id="patch.pdf", evidence_type="policy", proves_capability_id="cap_patch_management")], + ) + return build_company_profile(ctx, ISO_MAP) + + +def _ctx(): + return TransitionContext(company_id="kunde", known_certifications=["ISO27001"], + target=TransitionGoal(target_id="CRA", target_type=TargetType.REGULATION)) + + +# CRA-Required (injiziert; in echt: Obligation->Control->Required Capability, Execution) +def _reqs(): + return [ + TargetRequirement(capability_id="cap_patch_management", expected_evidence=["policy"]), # confirmed + TargetRequirement(capability_id="cap_incident_response"), # inferred (ISO) + TargetRequirement(capability_id="cap_asset_management"), # declared + TargetRequirement(capability_id="cap_sbom", question_intent="verify_existence", expected_evidence=["sbom"]), # missing + TargetRequirement(capability_id="cap_vuln_handling", supports_obligations=["CRA.1", "CRA.2"]), # missing, 2 obligations + TargetRequirement(capability_id="cap_wastewater", unsupported=True), # not in corpus + ] + + +def _req_ids(a): + return [r.capability_id for r in a.question_requests] + + +def _cov(a, cap): + return [c for c in a.coverage if c.capability_id == cap][0] + + +# The engine emits REQUESTS (information gaps), not rendered questions. +def test_emits_requests_not_questions(): + a = assess_transition(_ctx(), _reqs(), _profile()) + r = a.question_requests[0] + assert r.capability_id and r.question_intent and r.priority + # NO rendered question text anywhere — rendering is RS-005.1, not this engine + assert not hasattr(r, "question") + assert "question" not in type(r).model_fields and "rendered_text" not in type(r).model_fields + + +# Confirmed capability -> already_covered, NO request. +def test_confirmed_already_covered_no_request(): + a = assess_transition(_ctx(), _reqs(), _profile()) + assert _cov(a, "cap_patch_management").status == CoverageStatus.ALREADY_COVERED + assert "cap_patch_management" not in _req_ids(a) + + +# A certification-inferred capability is PROBABLY_COVERED (Welt 1), not already_covered; +# it produces a confirmation request, never a verdict. +def test_certification_inferred_is_probable_with_confirm_request(): + a = assess_transition(_ctx(), _reqs(), _profile()) + c = _cov(a, "cap_incident_response") + assert c.status == CoverageStatus.PROBABLY_COVERED + assert c.status != CoverageStatus.ALREADY_COVERED # cert alone never „erfüllt" + req = [r for r in a.question_requests if r.capability_id == "cap_incident_response"][0] + assert req.priority == RequestPriority.MEDIUM + + +# A missing required capability -> high-priority request. +def test_missing_high_priority_request(): + a = assess_transition(_ctx(), _reqs(), _profile()) + assert _cov(a, "cap_sbom").status == CoverageStatus.MISSING + req = [r for r in a.question_requests if r.capability_id == "cap_sbom"][0] + assert req.priority == RequestPriority.HIGH and req.information_gain == InformationGain.HIGH + + +# An unsupported domain -> no request (future corpus, honest). +def test_unsupported_no_request(): + a = assess_transition(_ctx(), _reqs(), _profile()) + assert _cov(a, "cap_wastewater").status == CoverageStatus.UNSUPPORTED + assert "cap_wastewater" not in _req_ids(a) + + +# Requests are ranked: HIGH (missing) before MEDIUM (probable/declared). +def test_requests_ranked_high_before_medium(): + a = assess_transition(_ctx(), _reqs(), _profile()) + prios = [r.priority for r in a.question_requests] + assert prios == sorted(prios, key=lambda p: {RequestPriority.HIGH: 0, RequestPriority.MEDIUM: 1, RequestPriority.LOW: 2}[p]) + # the two missing caps come first + assert set(_req_ids(a)[:2]) == {"cap_sbom", "cap_vuln_handling"} + + +# The funnel: certs reduce the open questions (only 4 of 6 requirements need clarifying). +def test_funnel_reduces_open_questions(): + a = assess_transition(_ctx(), _reqs(), _profile()) + # already_covered (patch) + unsupported (wastewater) drop out -> 4 requests + assert len(a.question_requests) == 4 + assert "%" not in a.summary.headline + + +# Deterministic + activates 2A: same inputs -> same result. +def test_deterministic(): + p = _profile() + a1 = assess_transition(_ctx(), _reqs(), p) + a2 = assess_transition(_ctx(), _reqs(), p) + assert _req_ids(a1) == _req_ids(a2) and a1.summary.headline == a2.summary.headline + + +# No requirements / no profile -> empty assessment (no Execution data in product code). +def test_empty(): + a = assess_transition(_ctx()) + assert a.question_requests == [] and a.coverage == []