feat(transition): Transition Reasoning v0 (RS-005) — Transition Planning Engine
Second reasoning mode, scope per user: the engine owns the INFORMATION GAPS, not the
questions. assess_transition(context, target_requirements, company_profile) emits
ranked TransitionQuestionRequest {capability, control, reason, question_intent,
expected_evidence, priority, information_gain} -- NOT rendered question text. Rendering
(intent+subject->sentence) is a separate swappable layer (RS-005.1), not here.
Consumes the Company Capability Profile (2A) as "have" + injected TargetRequirement
(Execution-owned placeholder) as "required" -- no required-capability data in product
code (EMPTY_REQUIREMENTS, mocks only in tests). A certification-derived capability is
probably_covered (Welt 1) -> a confirmation request, never already_covered/"erfuellt".
Deterministic, computed-not-stored, no percentages.
Activates 2A/2C/RCI (first consumer of the Company profile). Freeze-respecting: additive
package, no new graph/base class/meta-model class. 9 tests, mypy --strict clean, LOC ok.
No endpoint/UI/RAG; question rendering deliberately deferred to RS-005.1.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,42 @@
|
||||
"""Transition Reasoning v0 (RS-005) — the Transition Planning Engine.
|
||||
|
||||
Answers „Was muss ich noch wissen, um vom Ausgangs- in den regulatorischen
|
||||
Zielzustand zu kommen?". Owns the **information gaps** (`TransitionQuestionRequest`),
|
||||
NOT the rendered questions (rendering = separate RS-005.1 layer).
|
||||
|
||||
Consumes the Company Capability Profile (2A) as „have" + injected `TargetRequirement`
|
||||
(Execution-owned placeholder) as „required". Spec: docs-src/architecture/transition-reasoning-spec-v1.md.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from .engine import EMPTY_REQUIREMENTS, assess_transition
|
||||
from .schemas import (
|
||||
CapabilityCoverage,
|
||||
CoverageStatus,
|
||||
InformationGain,
|
||||
RequestPriority,
|
||||
TargetRequirement,
|
||||
TargetType,
|
||||
TransitionAssessment,
|
||||
TransitionContext,
|
||||
TransitionGoal,
|
||||
TransitionQuestionRequest,
|
||||
TransitionSummary,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"assess_transition",
|
||||
"EMPTY_REQUIREMENTS",
|
||||
"TransitionContext",
|
||||
"TransitionGoal",
|
||||
"TargetType",
|
||||
"TargetRequirement",
|
||||
"TransitionQuestionRequest",
|
||||
"CapabilityCoverage",
|
||||
"CoverageStatus",
|
||||
"RequestPriority",
|
||||
"InformationGain",
|
||||
"TransitionSummary",
|
||||
"TransitionAssessment",
|
||||
]
|
||||
@@ -0,0 +1,138 @@
|
||||
"""Transition Reasoning v0 (RS-005) — the Transition Planning Engine.
|
||||
|
||||
`assess_transition(context, target_requirements, company_profile)`: computes, per
|
||||
required capability, the coverage from the company's „have" state (Phase 2A), and
|
||||
emits a ranked list of `TransitionQuestionRequest` (information gaps) — NOT questions.
|
||||
|
||||
Deterministic; nothing is stored. A certification-derived „probably covered" is Welt 1
|
||||
(a hint), so it produces a confirmation request, never „erfüllt". Python 3.9 compatible.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from compliance.company import CompanyCapabilityProfile, VerificationStatus
|
||||
|
||||
from .schemas import (
|
||||
CapabilityCoverage,
|
||||
CoverageStatus,
|
||||
InformationGain,
|
||||
RequestPriority,
|
||||
TargetRequirement,
|
||||
TransitionAssessment,
|
||||
TransitionContext,
|
||||
TransitionQuestionRequest,
|
||||
TransitionSummary,
|
||||
)
|
||||
|
||||
EMPTY_REQUIREMENTS: List[TargetRequirement] = []
|
||||
|
||||
_STATUS_RANK = { # strongest „have" signal wins when a capability appears twice
|
||||
VerificationStatus.CONFIRMED: 3,
|
||||
VerificationStatus.INFERRED: 2,
|
||||
VerificationStatus.DECLARED: 1,
|
||||
VerificationStatus.UNKNOWN: 0,
|
||||
}
|
||||
|
||||
|
||||
def _have(profile: CompanyCapabilityProfile) -> Dict[str, VerificationStatus]:
|
||||
out: Dict[str, VerificationStatus] = {}
|
||||
for oc in profile.confirmed_capabilities:
|
||||
out[oc.capability_id] = VerificationStatus.CONFIRMED
|
||||
for c in profile.candidate_capabilities:
|
||||
cur = out.get(c.capability_id)
|
||||
if cur is None or _STATUS_RANK[c.verification_status] > _STATUS_RANK[cur]:
|
||||
out[c.capability_id] = c.verification_status
|
||||
return out
|
||||
|
||||
|
||||
def _classify(req: TargetRequirement, have: Dict[str, VerificationStatus]) -> CoverageStatus:
|
||||
if req.unsupported:
|
||||
return CoverageStatus.UNSUPPORTED
|
||||
status = have.get(req.capability_id)
|
||||
if status == VerificationStatus.CONFIRMED:
|
||||
return CoverageStatus.ALREADY_COVERED
|
||||
if status == VerificationStatus.INFERRED:
|
||||
return CoverageStatus.PROBABLY_COVERED
|
||||
if status == VerificationStatus.DECLARED:
|
||||
return CoverageStatus.NEEDS_CONFIRMATION
|
||||
return CoverageStatus.MISSING
|
||||
|
||||
|
||||
# coverage -> (request?, reason, base priority)
|
||||
_REQUESTABLE = {
|
||||
CoverageStatus.PROBABLY_COVERED: ("Vermutlich vorhanden (aus Zertifizierung) — mit Nachweis bestätigen.", RequestPriority.MEDIUM),
|
||||
CoverageStatus.NEEDS_CONFIRMATION: ("Selbst angegeben — Nachweis steht aus.", RequestPriority.MEDIUM),
|
||||
CoverageStatus.MISSING: ("Keine Anhaltspunkte im Unternehmensprofil — klären.", RequestPriority.HIGH),
|
||||
}
|
||||
|
||||
|
||||
def _gain(coverage: CoverageStatus, n_obligations: int) -> InformationGain:
|
||||
base = InformationGain.HIGH if coverage == CoverageStatus.MISSING else (
|
||||
InformationGain.MEDIUM if coverage == CoverageStatus.NEEDS_CONFIRMATION else InformationGain.LOW
|
||||
)
|
||||
if n_obligations >= 2 and base != InformationGain.HIGH: # more dependent obligations -> bump
|
||||
return InformationGain.HIGH if base == InformationGain.MEDIUM else InformationGain.MEDIUM
|
||||
return base
|
||||
|
||||
|
||||
_PRIO_ORDER = {RequestPriority.HIGH: 0, RequestPriority.MEDIUM: 1, RequestPriority.LOW: 2}
|
||||
_GAIN_ORDER = {InformationGain.HIGH: 0, InformationGain.MEDIUM: 1, InformationGain.LOW: 2}
|
||||
|
||||
|
||||
def assess_transition(
|
||||
context: TransitionContext,
|
||||
target_requirements: Optional[List[TargetRequirement]] = None,
|
||||
company_profile: Optional[CompanyCapabilityProfile] = None,
|
||||
) -> TransitionAssessment:
|
||||
reqs = EMPTY_REQUIREMENTS if target_requirements is None else target_requirements
|
||||
have = _have(company_profile) if company_profile is not None else {}
|
||||
|
||||
coverage: List[CapabilityCoverage] = []
|
||||
requests: List[TransitionQuestionRequest] = []
|
||||
buckets: Dict[CoverageStatus, List[str]] = {s: [] for s in CoverageStatus}
|
||||
|
||||
for req in reqs:
|
||||
status = _classify(req, have)
|
||||
coverage.append(
|
||||
CapabilityCoverage(
|
||||
capability_id=req.capability_id,
|
||||
status=status,
|
||||
have_status=have[req.capability_id].value if req.capability_id in have else None,
|
||||
)
|
||||
)
|
||||
buckets[status].append(req.capability_id)
|
||||
if status in _REQUESTABLE:
|
||||
reason, prio = _REQUESTABLE[status]
|
||||
requests.append(
|
||||
TransitionQuestionRequest(
|
||||
capability_id=req.capability_id,
|
||||
control_id=req.source_control_id,
|
||||
reason=reason,
|
||||
question_intent=req.question_intent,
|
||||
expected_evidence=req.expected_evidence,
|
||||
priority=prio,
|
||||
information_gain=_gain(status, len(req.supports_obligations)),
|
||||
)
|
||||
)
|
||||
|
||||
requests.sort(key=lambda r: (_PRIO_ORDER[r.priority], _GAIN_ORDER[r.information_gain], r.capability_id))
|
||||
|
||||
summary = TransitionSummary(
|
||||
headline=(
|
||||
"%d zu klären, %d bereits abgedeckt, %d vermutlich vorhanden, %d fehlt, %d n/a, %d nicht im Korpus."
|
||||
% (len(requests), len(buckets[CoverageStatus.ALREADY_COVERED]),
|
||||
len(buckets[CoverageStatus.PROBABLY_COVERED]), len(buckets[CoverageStatus.MISSING]),
|
||||
len(buckets[CoverageStatus.NOT_APPLICABLE]), len(buckets[CoverageStatus.UNSUPPORTED]))
|
||||
),
|
||||
what_to_clarify=[r.capability_id for r in requests],
|
||||
already_covered=buckets[CoverageStatus.ALREADY_COVERED],
|
||||
probably_covered=buckets[CoverageStatus.PROBABLY_COVERED],
|
||||
missing=buckets[CoverageStatus.MISSING],
|
||||
not_applicable=buckets[CoverageStatus.NOT_APPLICABLE],
|
||||
unsupported=buckets[CoverageStatus.UNSUPPORTED],
|
||||
)
|
||||
return TransitionAssessment(
|
||||
target_id=context.target.target_id, coverage=coverage, question_requests=requests, summary=summary
|
||||
)
|
||||
@@ -0,0 +1,110 @@
|
||||
"""Transition Reasoning v0 (RS-005) — domain objects.
|
||||
|
||||
The **Transition Planning Engine**: it answers „Was muss ich noch wissen, um vom
|
||||
Ausgangszustand in den regulatorischen Zielzustand zu kommen?" — NOT „wie frage ich
|
||||
das?". It therefore owns the **information gaps** (`TransitionQuestionRequest`), never
|
||||
the rendered question text. Rendering (intent + subject -> sentence) is a separate,
|
||||
swappable layer (RS-005.1 Question Generator) and is NOT part of this engine.
|
||||
|
||||
v0 consumes the Company Capability Profile (Phase 2A) as the „have" state and an
|
||||
INJECTED list of `TargetRequirement` as the Execution-owned „required" side (no
|
||||
required-capability data in product code — same discipline as 2A's EMPTY_MAPPING).
|
||||
|
||||
Welt-1-Grenze: a probable coverage (from a certification) is a hint, never „erfüllt";
|
||||
it produces a confirmation request, not a verdict. Application/reasoning types, NOT
|
||||
meta-model classes (freeze v1.0 untouched). Python 3.9 compatible (no `|` unions).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from enum import Enum
|
||||
from typing import List, Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class TargetType(str, Enum):
|
||||
REGULATION = "regulation"
|
||||
CERTIFICATION = "certification"
|
||||
FRAMEWORK = "framework"
|
||||
|
||||
|
||||
class CoverageStatus(str, Enum):
|
||||
ALREADY_COVERED = "already_covered" # confirmed in the company profile
|
||||
PROBABLY_COVERED = "probably_covered" # inferred (e.g. from a certification)
|
||||
NEEDS_CONFIRMATION = "needs_confirmation" # only declared / weak signal
|
||||
MISSING = "missing" # no signal
|
||||
NOT_APPLICABLE = "not_applicable"
|
||||
UNSUPPORTED = "unsupported" # domain not yet in the corpus (future_corpus_needed)
|
||||
|
||||
|
||||
class RequestPriority(str, Enum):
|
||||
HIGH = "high"
|
||||
MEDIUM = "medium"
|
||||
LOW = "low"
|
||||
|
||||
|
||||
class InformationGain(str, Enum):
|
||||
HIGH = "high"
|
||||
MEDIUM = "medium"
|
||||
LOW = "low"
|
||||
|
||||
|
||||
class TransitionGoal(BaseModel):
|
||||
target_id: str # e.g. "CRA", "TISAX"
|
||||
target_type: TargetType = TargetType.REGULATION
|
||||
label: str = ""
|
||||
|
||||
|
||||
class TransitionContext(BaseModel):
|
||||
company_id: str = ""
|
||||
known_certifications: List[str] = Field(default_factory=list)
|
||||
known_regulations: List[str] = Field(default_factory=list)
|
||||
target: TransitionGoal
|
||||
|
||||
|
||||
# ── INJECTED (Execution-owned): what the target requires ───────────────────
|
||||
class TargetRequirement(BaseModel):
|
||||
"""One required capability for the target. In v0 injected; later resolved from
|
||||
`Obligation -> Control -> Required Capability` + `Control -> question_intent`."""
|
||||
|
||||
capability_id: str # MCAP-...
|
||||
question_intent: str = "verify_existence" # passed through to the request, not rendered
|
||||
expected_evidence: List[str] = Field(default_factory=list)
|
||||
source_control_id: Optional[str] = None
|
||||
supports_obligations: List[str] = Field(default_factory=list)
|
||||
unsupported: bool = False # domain not yet in the corpus
|
||||
|
||||
|
||||
# ── the OWNED output: an information gap, NOT a question ───────────────────
|
||||
class TransitionQuestionRequest(BaseModel):
|
||||
capability_id: str
|
||||
control_id: Optional[str] = None
|
||||
reason: str
|
||||
question_intent: str # verify_existence / determine_duration / ... (rendered later)
|
||||
expected_evidence: List[str] = Field(default_factory=list)
|
||||
priority: RequestPriority
|
||||
information_gain: InformationGain
|
||||
|
||||
|
||||
class CapabilityCoverage(BaseModel):
|
||||
capability_id: str
|
||||
status: CoverageStatus
|
||||
have_status: Optional[str] = None # the 2A VerificationStatus that drove it
|
||||
|
||||
|
||||
class TransitionSummary(BaseModel):
|
||||
headline: str = "" # counts, NO percentage
|
||||
what_to_clarify: List[str] = Field(default_factory=list) # capability_ids with a request
|
||||
already_covered: List[str] = Field(default_factory=list)
|
||||
probably_covered: List[str] = Field(default_factory=list)
|
||||
missing: List[str] = Field(default_factory=list)
|
||||
not_applicable: List[str] = Field(default_factory=list)
|
||||
unsupported: List[str] = Field(default_factory=list)
|
||||
|
||||
|
||||
class TransitionAssessment(BaseModel):
|
||||
target_id: str
|
||||
coverage: List[CapabilityCoverage] = Field(default_factory=list)
|
||||
question_requests: List[TransitionQuestionRequest] = Field(default_factory=list) # ranked
|
||||
summary: TransitionSummary
|
||||
@@ -0,0 +1,133 @@
|
||||
"""Tests for Transition Reasoning v0 (RS-005) — the Transition Planning Engine.
|
||||
|
||||
Acceptance: from a TransitionGoal + the Company Capability Profile (2A, „have") +
|
||||
INJECTED TargetRequirements (Execution-owned „required"), the engine emits ranked
|
||||
`TransitionQuestionRequest`s (information gaps) — NOT rendered questions. A
|
||||
certification-derived capability is „probably_covered" (Welt 1), never „already_covered".
|
||||
|
||||
The cert->capability mapping below is a MOCK (Execution-owned in reality), only here.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from compliance.company import (
|
||||
CapabilityMappingEntry, Certification, CompanyContext, Declaration,
|
||||
ExistingEvidence, build_company_profile,
|
||||
)
|
||||
from compliance.reasoning.enums import Confidence
|
||||
from compliance.transition_reasoning import (
|
||||
CoverageStatus, InformationGain, RequestPriority, TargetRequirement, TargetType,
|
||||
TransitionContext, TransitionGoal, assess_transition,
|
||||
)
|
||||
|
||||
ISO_MAP = {"ISO27001": CapabilityMappingEntry(
|
||||
capability_ids=["cap_incident_response", "cap_supplier_management"], confidence=Confidence.MEDIUM)}
|
||||
|
||||
|
||||
def _profile():
|
||||
ctx = CompanyContext(
|
||||
company_id="kunde",
|
||||
certifications=[Certification(certification_id="ISO27001")],
|
||||
declarations=[Declaration(capability_id="cap_asset_management")],
|
||||
evidence=[ExistingEvidence(evidence_id="patch.pdf", evidence_type="policy", proves_capability_id="cap_patch_management")],
|
||||
)
|
||||
return build_company_profile(ctx, ISO_MAP)
|
||||
|
||||
|
||||
def _ctx():
|
||||
return TransitionContext(company_id="kunde", known_certifications=["ISO27001"],
|
||||
target=TransitionGoal(target_id="CRA", target_type=TargetType.REGULATION))
|
||||
|
||||
|
||||
# CRA-Required (injiziert; in echt: Obligation->Control->Required Capability, Execution)
|
||||
def _reqs():
|
||||
return [
|
||||
TargetRequirement(capability_id="cap_patch_management", expected_evidence=["policy"]), # confirmed
|
||||
TargetRequirement(capability_id="cap_incident_response"), # inferred (ISO)
|
||||
TargetRequirement(capability_id="cap_asset_management"), # declared
|
||||
TargetRequirement(capability_id="cap_sbom", question_intent="verify_existence", expected_evidence=["sbom"]), # missing
|
||||
TargetRequirement(capability_id="cap_vuln_handling", supports_obligations=["CRA.1", "CRA.2"]), # missing, 2 obligations
|
||||
TargetRequirement(capability_id="cap_wastewater", unsupported=True), # not in corpus
|
||||
]
|
||||
|
||||
|
||||
def _req_ids(a):
|
||||
return [r.capability_id for r in a.question_requests]
|
||||
|
||||
|
||||
def _cov(a, cap):
|
||||
return [c for c in a.coverage if c.capability_id == cap][0]
|
||||
|
||||
|
||||
# The engine emits REQUESTS (information gaps), not rendered questions.
|
||||
def test_emits_requests_not_questions():
|
||||
a = assess_transition(_ctx(), _reqs(), _profile())
|
||||
r = a.question_requests[0]
|
||||
assert r.capability_id and r.question_intent and r.priority
|
||||
# NO rendered question text anywhere — rendering is RS-005.1, not this engine
|
||||
assert not hasattr(r, "question")
|
||||
assert "question" not in type(r).model_fields and "rendered_text" not in type(r).model_fields
|
||||
|
||||
|
||||
# Confirmed capability -> already_covered, NO request.
|
||||
def test_confirmed_already_covered_no_request():
|
||||
a = assess_transition(_ctx(), _reqs(), _profile())
|
||||
assert _cov(a, "cap_patch_management").status == CoverageStatus.ALREADY_COVERED
|
||||
assert "cap_patch_management" not in _req_ids(a)
|
||||
|
||||
|
||||
# A certification-inferred capability is PROBABLY_COVERED (Welt 1), not already_covered;
|
||||
# it produces a confirmation request, never a verdict.
|
||||
def test_certification_inferred_is_probable_with_confirm_request():
|
||||
a = assess_transition(_ctx(), _reqs(), _profile())
|
||||
c = _cov(a, "cap_incident_response")
|
||||
assert c.status == CoverageStatus.PROBABLY_COVERED
|
||||
assert c.status != CoverageStatus.ALREADY_COVERED # cert alone never „erfüllt"
|
||||
req = [r for r in a.question_requests if r.capability_id == "cap_incident_response"][0]
|
||||
assert req.priority == RequestPriority.MEDIUM
|
||||
|
||||
|
||||
# A missing required capability -> high-priority request.
|
||||
def test_missing_high_priority_request():
|
||||
a = assess_transition(_ctx(), _reqs(), _profile())
|
||||
assert _cov(a, "cap_sbom").status == CoverageStatus.MISSING
|
||||
req = [r for r in a.question_requests if r.capability_id == "cap_sbom"][0]
|
||||
assert req.priority == RequestPriority.HIGH and req.information_gain == InformationGain.HIGH
|
||||
|
||||
|
||||
# An unsupported domain -> no request (future corpus, honest).
|
||||
def test_unsupported_no_request():
|
||||
a = assess_transition(_ctx(), _reqs(), _profile())
|
||||
assert _cov(a, "cap_wastewater").status == CoverageStatus.UNSUPPORTED
|
||||
assert "cap_wastewater" not in _req_ids(a)
|
||||
|
||||
|
||||
# Requests are ranked: HIGH (missing) before MEDIUM (probable/declared).
|
||||
def test_requests_ranked_high_before_medium():
|
||||
a = assess_transition(_ctx(), _reqs(), _profile())
|
||||
prios = [r.priority for r in a.question_requests]
|
||||
assert prios == sorted(prios, key=lambda p: {RequestPriority.HIGH: 0, RequestPriority.MEDIUM: 1, RequestPriority.LOW: 2}[p])
|
||||
# the two missing caps come first
|
||||
assert set(_req_ids(a)[:2]) == {"cap_sbom", "cap_vuln_handling"}
|
||||
|
||||
|
||||
# The funnel: certs reduce the open questions (only 4 of 6 requirements need clarifying).
|
||||
def test_funnel_reduces_open_questions():
|
||||
a = assess_transition(_ctx(), _reqs(), _profile())
|
||||
# already_covered (patch) + unsupported (wastewater) drop out -> 4 requests
|
||||
assert len(a.question_requests) == 4
|
||||
assert "%" not in a.summary.headline
|
||||
|
||||
|
||||
# Deterministic + activates 2A: same inputs -> same result.
|
||||
def test_deterministic():
|
||||
p = _profile()
|
||||
a1 = assess_transition(_ctx(), _reqs(), p)
|
||||
a2 = assess_transition(_ctx(), _reqs(), p)
|
||||
assert _req_ids(a1) == _req_ids(a2) and a1.summary.headline == a2.summary.headline
|
||||
|
||||
|
||||
# No requirements / no profile -> empty assessment (no Execution data in product code).
|
||||
def test_empty():
|
||||
a = assess_transition(_ctx())
|
||||
assert a.question_requests == [] and a.coverage == []
|
||||
Reference in New Issue
Block a user