77de7e794c
Second reasoning mode, scope per user: the engine owns the INFORMATION GAPS, not the
questions. assess_transition(context, target_requirements, company_profile) emits
ranked TransitionQuestionRequest {capability, control, reason, question_intent,
expected_evidence, priority, information_gain} -- NOT rendered question text. Rendering
(intent+subject->sentence) is a separate swappable layer (RS-005.1), not here.
Consumes the Company Capability Profile (2A) as "have" + injected TargetRequirement
(Execution-owned placeholder) as "required" -- no required-capability data in product
code (EMPTY_REQUIREMENTS, mocks only in tests). A certification-derived capability is
probably_covered (Welt 1) -> a confirmation request, never already_covered/"erfuellt".
Deterministic, computed-not-stored, no percentages.
Activates 2A/2C/RCI (first consumer of the Company profile). Freeze-respecting: additive
package, no new graph/base class/meta-model class. 9 tests, mypy --strict clean, LOC ok.
No endpoint/UI/RAG; question rendering deliberately deferred to RS-005.1.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
139 lines
5.7 KiB
Python
139 lines
5.7 KiB
Python
"""Transition Reasoning v0 (RS-005) — the Transition Planning Engine.
|
|
|
|
`assess_transition(context, target_requirements, company_profile)`: computes, per
|
|
required capability, the coverage from the company's „have" state (Phase 2A), and
|
|
emits a ranked list of `TransitionQuestionRequest` (information gaps) — NOT questions.
|
|
|
|
Deterministic; nothing is stored. A certification-derived „probably covered" is Welt 1
|
|
(a hint), so it produces a confirmation request, never „erfüllt". Python 3.9 compatible.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Dict, List, Optional
|
|
|
|
from compliance.company import CompanyCapabilityProfile, VerificationStatus
|
|
|
|
from .schemas import (
|
|
CapabilityCoverage,
|
|
CoverageStatus,
|
|
InformationGain,
|
|
RequestPriority,
|
|
TargetRequirement,
|
|
TransitionAssessment,
|
|
TransitionContext,
|
|
TransitionQuestionRequest,
|
|
TransitionSummary,
|
|
)
|
|
|
|
EMPTY_REQUIREMENTS: List[TargetRequirement] = []
|
|
|
|
_STATUS_RANK = { # strongest „have" signal wins when a capability appears twice
|
|
VerificationStatus.CONFIRMED: 3,
|
|
VerificationStatus.INFERRED: 2,
|
|
VerificationStatus.DECLARED: 1,
|
|
VerificationStatus.UNKNOWN: 0,
|
|
}
|
|
|
|
|
|
def _have(profile: CompanyCapabilityProfile) -> Dict[str, VerificationStatus]:
|
|
out: Dict[str, VerificationStatus] = {}
|
|
for oc in profile.confirmed_capabilities:
|
|
out[oc.capability_id] = VerificationStatus.CONFIRMED
|
|
for c in profile.candidate_capabilities:
|
|
cur = out.get(c.capability_id)
|
|
if cur is None or _STATUS_RANK[c.verification_status] > _STATUS_RANK[cur]:
|
|
out[c.capability_id] = c.verification_status
|
|
return out
|
|
|
|
|
|
def _classify(req: TargetRequirement, have: Dict[str, VerificationStatus]) -> CoverageStatus:
|
|
if req.unsupported:
|
|
return CoverageStatus.UNSUPPORTED
|
|
status = have.get(req.capability_id)
|
|
if status == VerificationStatus.CONFIRMED:
|
|
return CoverageStatus.ALREADY_COVERED
|
|
if status == VerificationStatus.INFERRED:
|
|
return CoverageStatus.PROBABLY_COVERED
|
|
if status == VerificationStatus.DECLARED:
|
|
return CoverageStatus.NEEDS_CONFIRMATION
|
|
return CoverageStatus.MISSING
|
|
|
|
|
|
# coverage -> (request?, reason, base priority)
|
|
_REQUESTABLE = {
|
|
CoverageStatus.PROBABLY_COVERED: ("Vermutlich vorhanden (aus Zertifizierung) — mit Nachweis bestätigen.", RequestPriority.MEDIUM),
|
|
CoverageStatus.NEEDS_CONFIRMATION: ("Selbst angegeben — Nachweis steht aus.", RequestPriority.MEDIUM),
|
|
CoverageStatus.MISSING: ("Keine Anhaltspunkte im Unternehmensprofil — klären.", RequestPriority.HIGH),
|
|
}
|
|
|
|
|
|
def _gain(coverage: CoverageStatus, n_obligations: int) -> InformationGain:
|
|
base = InformationGain.HIGH if coverage == CoverageStatus.MISSING else (
|
|
InformationGain.MEDIUM if coverage == CoverageStatus.NEEDS_CONFIRMATION else InformationGain.LOW
|
|
)
|
|
if n_obligations >= 2 and base != InformationGain.HIGH: # more dependent obligations -> bump
|
|
return InformationGain.HIGH if base == InformationGain.MEDIUM else InformationGain.MEDIUM
|
|
return base
|
|
|
|
|
|
_PRIO_ORDER = {RequestPriority.HIGH: 0, RequestPriority.MEDIUM: 1, RequestPriority.LOW: 2}
|
|
_GAIN_ORDER = {InformationGain.HIGH: 0, InformationGain.MEDIUM: 1, InformationGain.LOW: 2}
|
|
|
|
|
|
def assess_transition(
|
|
context: TransitionContext,
|
|
target_requirements: Optional[List[TargetRequirement]] = None,
|
|
company_profile: Optional[CompanyCapabilityProfile] = None,
|
|
) -> TransitionAssessment:
|
|
reqs = EMPTY_REQUIREMENTS if target_requirements is None else target_requirements
|
|
have = _have(company_profile) if company_profile is not None else {}
|
|
|
|
coverage: List[CapabilityCoverage] = []
|
|
requests: List[TransitionQuestionRequest] = []
|
|
buckets: Dict[CoverageStatus, List[str]] = {s: [] for s in CoverageStatus}
|
|
|
|
for req in reqs:
|
|
status = _classify(req, have)
|
|
coverage.append(
|
|
CapabilityCoverage(
|
|
capability_id=req.capability_id,
|
|
status=status,
|
|
have_status=have[req.capability_id].value if req.capability_id in have else None,
|
|
)
|
|
)
|
|
buckets[status].append(req.capability_id)
|
|
if status in _REQUESTABLE:
|
|
reason, prio = _REQUESTABLE[status]
|
|
requests.append(
|
|
TransitionQuestionRequest(
|
|
capability_id=req.capability_id,
|
|
control_id=req.source_control_id,
|
|
reason=reason,
|
|
question_intent=req.question_intent,
|
|
expected_evidence=req.expected_evidence,
|
|
priority=prio,
|
|
information_gain=_gain(status, len(req.supports_obligations)),
|
|
)
|
|
)
|
|
|
|
requests.sort(key=lambda r: (_PRIO_ORDER[r.priority], _GAIN_ORDER[r.information_gain], r.capability_id))
|
|
|
|
summary = TransitionSummary(
|
|
headline=(
|
|
"%d zu klären, %d bereits abgedeckt, %d vermutlich vorhanden, %d fehlt, %d n/a, %d nicht im Korpus."
|
|
% (len(requests), len(buckets[CoverageStatus.ALREADY_COVERED]),
|
|
len(buckets[CoverageStatus.PROBABLY_COVERED]), len(buckets[CoverageStatus.MISSING]),
|
|
len(buckets[CoverageStatus.NOT_APPLICABLE]), len(buckets[CoverageStatus.UNSUPPORTED]))
|
|
),
|
|
what_to_clarify=[r.capability_id for r in requests],
|
|
already_covered=buckets[CoverageStatus.ALREADY_COVERED],
|
|
probably_covered=buckets[CoverageStatus.PROBABLY_COVERED],
|
|
missing=buckets[CoverageStatus.MISSING],
|
|
not_applicable=buckets[CoverageStatus.NOT_APPLICABLE],
|
|
unsupported=buckets[CoverageStatus.UNSUPPORTED],
|
|
)
|
|
return TransitionAssessment(
|
|
target_id=context.target.target_id, coverage=coverage, question_requests=requests, summary=summary
|
|
)
|