807a7002b2
The advisor was structurally correct but unusable: every question showed a snake_case capability id plus a
single generic fallback reason ("Keine Anhaltspunkte im Unternehmensprofil — klären"). The expert text
already EXISTED in the transition patterns (why_asked / reviewable_claim) — the pipeline just dropped it.
- transition_reasoning: TargetRequirement gains `rationale`; assess_transition uses it as the request
reason when present, else the generic fallback (additive, backward-compatible for all consumers).
- onboarding_service._target carries the pattern's why_asked (delta) and reviewable_claim (likely_covered)
into the requirement rationale -> the question's `why`.
- knowledge/onboarding/capability_labels.yaml: curated DE labels (id -> human), reusable across targets;
labels_for() + response.capability_labels expose them; the frontend renders label || prettified id.
Now ISO27001->TISAX reads "Auftragsverarbeitung (Art. 28 DSGVO) — If a TISAX data label is in scope, you
must show Art. 28 GDPR processing-on-behalf controls; ISO 27001 does not establish these." instead of
"data_protection_processing_on_behalf — klären". why_asked text is still EN (existing knowledge; translation
is curation). 34 onboarding+transition tests pass, mypy --strict clean (13 modules), check-loc 0.
140 lines
5.8 KiB
Python
140 lines
5.8 KiB
Python
"""Transition Reasoning v0 (RS-005) — the Transition Planning Engine.
|
|
|
|
`assess_transition(context, target_requirements, company_profile)`: computes, per
|
|
required capability, the coverage from the company's „have" state (Phase 2A), and
|
|
emits a ranked list of `TransitionQuestionRequest` (information gaps) — NOT questions.
|
|
|
|
Deterministic; nothing is stored. A certification-derived „probably covered" is Welt 1
|
|
(a hint), so it produces a confirmation request, never „erfüllt". Python 3.9 compatible.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Dict, List, Optional
|
|
|
|
from compliance.company import CompanyCapabilityProfile, VerificationStatus
|
|
|
|
from .schemas import (
|
|
CapabilityCoverage,
|
|
CoverageStatus,
|
|
InformationGain,
|
|
RequestPriority,
|
|
TargetRequirement,
|
|
TransitionAssessment,
|
|
TransitionContext,
|
|
TransitionQuestionRequest,
|
|
TransitionSummary,
|
|
)
|
|
|
|
EMPTY_REQUIREMENTS: List[TargetRequirement] = []
|
|
|
|
_STATUS_RANK = { # strongest „have" signal wins when a capability appears twice
|
|
VerificationStatus.CONFIRMED: 3,
|
|
VerificationStatus.INFERRED: 2,
|
|
VerificationStatus.DECLARED: 1,
|
|
VerificationStatus.UNKNOWN: 0,
|
|
}
|
|
|
|
|
|
def _have(profile: CompanyCapabilityProfile) -> Dict[str, VerificationStatus]:
|
|
out: Dict[str, VerificationStatus] = {}
|
|
for oc in profile.confirmed_capabilities:
|
|
out[oc.capability_id] = VerificationStatus.CONFIRMED
|
|
for c in profile.candidate_capabilities:
|
|
cur = out.get(c.capability_id)
|
|
if cur is None or _STATUS_RANK[c.verification_status] > _STATUS_RANK[cur]:
|
|
out[c.capability_id] = c.verification_status
|
|
return out
|
|
|
|
|
|
def _classify(req: TargetRequirement, have: Dict[str, VerificationStatus]) -> CoverageStatus:
|
|
if req.unsupported:
|
|
return CoverageStatus.UNSUPPORTED
|
|
status = have.get(req.capability_id)
|
|
if status == VerificationStatus.CONFIRMED:
|
|
return CoverageStatus.ALREADY_COVERED
|
|
if status == VerificationStatus.INFERRED:
|
|
return CoverageStatus.PROBABLY_COVERED
|
|
if status == VerificationStatus.DECLARED:
|
|
return CoverageStatus.NEEDS_CONFIRMATION
|
|
return CoverageStatus.MISSING
|
|
|
|
|
|
# coverage -> (request?, reason, base priority)
|
|
_REQUESTABLE = {
|
|
CoverageStatus.PROBABLY_COVERED: ("Vermutlich vorhanden (aus Zertifizierung) — mit Nachweis bestätigen.", RequestPriority.MEDIUM),
|
|
CoverageStatus.NEEDS_CONFIRMATION: ("Selbst angegeben — Nachweis steht aus.", RequestPriority.MEDIUM),
|
|
CoverageStatus.MISSING: ("Keine Anhaltspunkte im Unternehmensprofil — klären.", RequestPriority.HIGH),
|
|
}
|
|
|
|
|
|
def _gain(coverage: CoverageStatus, n_obligations: int) -> InformationGain:
|
|
base = InformationGain.HIGH if coverage == CoverageStatus.MISSING else (
|
|
InformationGain.MEDIUM if coverage == CoverageStatus.NEEDS_CONFIRMATION else InformationGain.LOW
|
|
)
|
|
if n_obligations >= 2 and base != InformationGain.HIGH: # more dependent obligations -> bump
|
|
return InformationGain.HIGH if base == InformationGain.MEDIUM else InformationGain.MEDIUM
|
|
return base
|
|
|
|
|
|
_PRIO_ORDER = {RequestPriority.HIGH: 0, RequestPriority.MEDIUM: 1, RequestPriority.LOW: 2}
|
|
_GAIN_ORDER = {InformationGain.HIGH: 0, InformationGain.MEDIUM: 1, InformationGain.LOW: 2}
|
|
|
|
|
|
def assess_transition(
|
|
context: TransitionContext,
|
|
target_requirements: Optional[List[TargetRequirement]] = None,
|
|
company_profile: Optional[CompanyCapabilityProfile] = None,
|
|
) -> TransitionAssessment:
|
|
reqs = EMPTY_REQUIREMENTS if target_requirements is None else target_requirements
|
|
have = _have(company_profile) if company_profile is not None else {}
|
|
|
|
coverage: List[CapabilityCoverage] = []
|
|
requests: List[TransitionQuestionRequest] = []
|
|
buckets: Dict[CoverageStatus, List[str]] = {s: [] for s in CoverageStatus}
|
|
|
|
for req in reqs:
|
|
status = _classify(req, have)
|
|
coverage.append(
|
|
CapabilityCoverage(
|
|
capability_id=req.capability_id,
|
|
status=status,
|
|
have_status=have[req.capability_id].value if req.capability_id in have else None,
|
|
)
|
|
)
|
|
buckets[status].append(req.capability_id)
|
|
if status in _REQUESTABLE:
|
|
default_reason, prio = _REQUESTABLE[status]
|
|
reason = req.rationale or default_reason # curated human text wins over the generic fallback
|
|
requests.append(
|
|
TransitionQuestionRequest(
|
|
capability_id=req.capability_id,
|
|
control_id=req.source_control_id,
|
|
reason=reason,
|
|
question_intent=req.question_intent,
|
|
expected_evidence=req.expected_evidence,
|
|
priority=prio,
|
|
information_gain=_gain(status, len(req.supports_obligations)),
|
|
)
|
|
)
|
|
|
|
requests.sort(key=lambda r: (_PRIO_ORDER[r.priority], _GAIN_ORDER[r.information_gain], r.capability_id))
|
|
|
|
summary = TransitionSummary(
|
|
headline=(
|
|
"%d zu klären, %d bereits abgedeckt, %d vermutlich vorhanden, %d fehlt, %d n/a, %d nicht im Korpus."
|
|
% (len(requests), len(buckets[CoverageStatus.ALREADY_COVERED]),
|
|
len(buckets[CoverageStatus.PROBABLY_COVERED]), len(buckets[CoverageStatus.MISSING]),
|
|
len(buckets[CoverageStatus.NOT_APPLICABLE]), len(buckets[CoverageStatus.UNSUPPORTED]))
|
|
),
|
|
what_to_clarify=[r.capability_id for r in requests],
|
|
already_covered=buckets[CoverageStatus.ALREADY_COVERED],
|
|
probably_covered=buckets[CoverageStatus.PROBABLY_COVERED],
|
|
missing=buckets[CoverageStatus.MISSING],
|
|
not_applicable=buckets[CoverageStatus.NOT_APPLICABLE],
|
|
unsupported=buckets[CoverageStatus.UNSUPPORTED],
|
|
)
|
|
return TransitionAssessment(
|
|
target_id=context.target.target_id, coverage=coverage, question_requests=requests, summary=summary
|
|
)
|