"""Transition Reasoning v0 (RS-005) — the Transition Planning Engine. `assess_transition(context, target_requirements, company_profile)`: computes, per required capability, the coverage from the company's „have" state (Phase 2A), and emits a ranked list of `TransitionQuestionRequest` (information gaps) — NOT questions. Deterministic; nothing is stored. A certification-derived „probably covered" is Welt 1 (a hint), so it produces a confirmation request, never „erfüllt". Python 3.9 compatible. """ from __future__ import annotations from typing import Dict, List, Optional from compliance.company import CompanyCapabilityProfile, VerificationStatus from .schemas import ( CapabilityCoverage, CoverageStatus, InformationGain, RequestPriority, TargetRequirement, TransitionAssessment, TransitionContext, TransitionQuestionRequest, TransitionSummary, ) EMPTY_REQUIREMENTS: List[TargetRequirement] = [] _STATUS_RANK = { # strongest „have" signal wins when a capability appears twice VerificationStatus.CONFIRMED: 3, VerificationStatus.INFERRED: 2, VerificationStatus.DECLARED: 1, VerificationStatus.UNKNOWN: 0, } def _have(profile: CompanyCapabilityProfile) -> Dict[str, VerificationStatus]: out: Dict[str, VerificationStatus] = {} for oc in profile.confirmed_capabilities: out[oc.capability_id] = VerificationStatus.CONFIRMED for c in profile.candidate_capabilities: cur = out.get(c.capability_id) if cur is None or _STATUS_RANK[c.verification_status] > _STATUS_RANK[cur]: out[c.capability_id] = c.verification_status return out def _classify(req: TargetRequirement, have: Dict[str, VerificationStatus]) -> CoverageStatus: if req.unsupported: return CoverageStatus.UNSUPPORTED status = have.get(req.capability_id) if status == VerificationStatus.CONFIRMED: return CoverageStatus.ALREADY_COVERED if status == VerificationStatus.INFERRED: return CoverageStatus.PROBABLY_COVERED if status == VerificationStatus.DECLARED: return CoverageStatus.NEEDS_CONFIRMATION return CoverageStatus.MISSING # coverage -> (request?, reason, base priority) _REQUESTABLE = { CoverageStatus.PROBABLY_COVERED: ("Vermutlich vorhanden (aus Zertifizierung) — mit Nachweis bestätigen.", RequestPriority.MEDIUM), CoverageStatus.NEEDS_CONFIRMATION: ("Selbst angegeben — Nachweis steht aus.", RequestPriority.MEDIUM), CoverageStatus.MISSING: ("Keine Anhaltspunkte im Unternehmensprofil — klären.", RequestPriority.HIGH), } def _gain(coverage: CoverageStatus, n_obligations: int) -> InformationGain: base = InformationGain.HIGH if coverage == CoverageStatus.MISSING else ( InformationGain.MEDIUM if coverage == CoverageStatus.NEEDS_CONFIRMATION else InformationGain.LOW ) if n_obligations >= 2 and base != InformationGain.HIGH: # more dependent obligations -> bump return InformationGain.HIGH if base == InformationGain.MEDIUM else InformationGain.MEDIUM return base _PRIO_ORDER = {RequestPriority.HIGH: 0, RequestPriority.MEDIUM: 1, RequestPriority.LOW: 2} _GAIN_ORDER = {InformationGain.HIGH: 0, InformationGain.MEDIUM: 1, InformationGain.LOW: 2} def assess_transition( context: TransitionContext, target_requirements: Optional[List[TargetRequirement]] = None, company_profile: Optional[CompanyCapabilityProfile] = None, ) -> TransitionAssessment: reqs = EMPTY_REQUIREMENTS if target_requirements is None else target_requirements have = _have(company_profile) if company_profile is not None else {} coverage: List[CapabilityCoverage] = [] requests: List[TransitionQuestionRequest] = [] buckets: Dict[CoverageStatus, List[str]] = {s: [] for s in CoverageStatus} for req in reqs: status = _classify(req, have) coverage.append( CapabilityCoverage( capability_id=req.capability_id, status=status, have_status=have[req.capability_id].value if req.capability_id in have else None, ) ) buckets[status].append(req.capability_id) if status in _REQUESTABLE: reason, prio = _REQUESTABLE[status] requests.append( TransitionQuestionRequest( capability_id=req.capability_id, control_id=req.source_control_id, reason=reason, question_intent=req.question_intent, expected_evidence=req.expected_evidence, priority=prio, information_gain=_gain(status, len(req.supports_obligations)), ) ) requests.sort(key=lambda r: (_PRIO_ORDER[r.priority], _GAIN_ORDER[r.information_gain], r.capability_id)) summary = TransitionSummary( headline=( "%d zu klären, %d bereits abgedeckt, %d vermutlich vorhanden, %d fehlt, %d n/a, %d nicht im Korpus." % (len(requests), len(buckets[CoverageStatus.ALREADY_COVERED]), len(buckets[CoverageStatus.PROBABLY_COVERED]), len(buckets[CoverageStatus.MISSING]), len(buckets[CoverageStatus.NOT_APPLICABLE]), len(buckets[CoverageStatus.UNSUPPORTED])) ), what_to_clarify=[r.capability_id for r in requests], already_covered=buckets[CoverageStatus.ALREADY_COVERED], probably_covered=buckets[CoverageStatus.PROBABLY_COVERED], missing=buckets[CoverageStatus.MISSING], not_applicable=buckets[CoverageStatus.NOT_APPLICABLE], unsupported=buckets[CoverageStatus.UNSUPPORTED], ) return TransitionAssessment( target_id=context.target.target_id, coverage=coverage, question_requests=requests, summary=summary )