feat: control-pipeline Service aus Compliance-Repo migriert
Control-Pipeline (Pass 0a/0b, BatchDedup, Generator) als eigenstaendiger Service in Core, damit Compliance-Repo unabhaengig refakturiert werden kann. Schreibt weiterhin ins compliance-Schema der shared PostgreSQL. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
154
control-pipeline/services/control_status_machine.py
Normal file
154
control-pipeline/services/control_status_machine.py
Normal file
@@ -0,0 +1,154 @@
|
||||
"""
|
||||
Control Status Transition State Machine.
|
||||
|
||||
Enforces that controls cannot be set to "pass" without sufficient evidence.
|
||||
Prevents Compliance-Theater where controls claim compliance without real proof.
|
||||
|
||||
Transition rules:
|
||||
planned → in_progress : always allowed
|
||||
in_progress → pass : requires ≥1 evidence with confidence ≥ E2 and
|
||||
truth_status in (uploaded, observed, validated_internal)
|
||||
in_progress → partial : requires ≥1 evidence (any level)
|
||||
pass → fail : always allowed (degradation)
|
||||
any → n/a : requires status_justification
|
||||
any → planned : always allowed (reset)
|
||||
"""
|
||||
|
||||
from typing import Any, List, Optional, Tuple
|
||||
|
||||
# EvidenceDB is an ORM model from compliance — we only need duck-typed objects
|
||||
# with .confidence_level and .truth_status attributes.
|
||||
EvidenceDB = Any
|
||||
|
||||
|
||||
# Confidence level ordering for comparisons
|
||||
CONFIDENCE_ORDER = {"E0": 0, "E1": 1, "E2": 2, "E3": 3, "E4": 4}
|
||||
|
||||
# Truth statuses that qualify as "real" evidence for pass transitions
|
||||
VALID_TRUTH_STATUSES = {"uploaded", "observed", "validated_internal", "accepted_by_auditor", "provided_to_auditor"}
|
||||
|
||||
|
||||
def validate_transition(
|
||||
current_status: str,
|
||||
new_status: str,
|
||||
evidence_list: Optional[List[EvidenceDB]] = None,
|
||||
status_justification: Optional[str] = None,
|
||||
bypass_for_auto_updater: bool = False,
|
||||
) -> Tuple[bool, List[str]]:
|
||||
"""
|
||||
Validate whether a control status transition is allowed.
|
||||
|
||||
Args:
|
||||
current_status: Current control status value (e.g. "planned", "pass")
|
||||
new_status: Requested new status
|
||||
evidence_list: List of EvidenceDB objects linked to this control
|
||||
status_justification: Text justification (required for n/a transitions)
|
||||
bypass_for_auto_updater: If True, skip evidence checks (used by CI/CD auto-updater
|
||||
which creates evidence atomically with status change)
|
||||
|
||||
Returns:
|
||||
Tuple of (allowed: bool, violations: list[str])
|
||||
"""
|
||||
violations: List[str] = []
|
||||
evidence_list = evidence_list or []
|
||||
|
||||
# Same status → no-op, always allowed
|
||||
if current_status == new_status:
|
||||
return True, []
|
||||
|
||||
# Reset to planned is always allowed
|
||||
if new_status == "planned":
|
||||
return True, []
|
||||
|
||||
# n/a requires justification
|
||||
if new_status == "n/a":
|
||||
if not status_justification or not status_justification.strip():
|
||||
violations.append("Transition to 'n/a' requires a status_justification explaining why this control is not applicable.")
|
||||
return len(violations) == 0, violations
|
||||
|
||||
# Degradation: pass → fail is always allowed
|
||||
if current_status == "pass" and new_status == "fail":
|
||||
return True, []
|
||||
|
||||
# planned → in_progress: always allowed
|
||||
if current_status == "planned" and new_status == "in_progress":
|
||||
return True, []
|
||||
|
||||
# in_progress → partial: needs at least 1 evidence
|
||||
if new_status == "partial":
|
||||
if not bypass_for_auto_updater and len(evidence_list) == 0:
|
||||
violations.append("Transition to 'partial' requires at least 1 evidence record.")
|
||||
return len(violations) == 0, violations
|
||||
|
||||
# in_progress → pass: strict requirements
|
||||
if new_status == "pass":
|
||||
if bypass_for_auto_updater:
|
||||
return True, []
|
||||
|
||||
if len(evidence_list) == 0:
|
||||
violations.append("Transition to 'pass' requires at least 1 evidence record.")
|
||||
return False, violations
|
||||
|
||||
# Check for at least one qualifying evidence
|
||||
has_qualifying = False
|
||||
for e in evidence_list:
|
||||
conf = getattr(e, "confidence_level", None)
|
||||
truth = getattr(e, "truth_status", None)
|
||||
|
||||
# Get string values from enum or string
|
||||
conf_val = conf.value if hasattr(conf, "value") else str(conf) if conf else "E1"
|
||||
truth_val = truth.value if hasattr(truth, "value") else str(truth) if truth else "uploaded"
|
||||
|
||||
if CONFIDENCE_ORDER.get(conf_val, 1) >= CONFIDENCE_ORDER["E2"] and truth_val in VALID_TRUTH_STATUSES:
|
||||
has_qualifying = True
|
||||
break
|
||||
|
||||
if not has_qualifying:
|
||||
violations.append(
|
||||
"Transition to 'pass' requires at least 1 evidence with confidence >= E2 "
|
||||
"and truth_status in (uploaded, observed, validated_internal, accepted_by_auditor). "
|
||||
"Current evidence does not meet this threshold."
|
||||
)
|
||||
|
||||
return len(violations) == 0, violations
|
||||
|
||||
# in_progress → fail: always allowed
|
||||
if new_status == "fail":
|
||||
return True, []
|
||||
|
||||
# Any other transition from planned/fail to pass requires going through in_progress
|
||||
if current_status in ("planned", "fail") and new_status == "pass":
|
||||
if bypass_for_auto_updater:
|
||||
return True, []
|
||||
violations.append(
|
||||
f"Direct transition from '{current_status}' to 'pass' is not allowed. "
|
||||
f"Move to 'in_progress' first, then to 'pass' with qualifying evidence."
|
||||
)
|
||||
return False, violations
|
||||
|
||||
# Default: allow other transitions (e.g. fail → partial, partial → pass)
|
||||
# For partial → pass, apply the same evidence checks
|
||||
if current_status == "partial" and new_status == "pass":
|
||||
if bypass_for_auto_updater:
|
||||
return True, []
|
||||
|
||||
has_qualifying = False
|
||||
for e in evidence_list:
|
||||
conf = getattr(e, "confidence_level", None)
|
||||
truth = getattr(e, "truth_status", None)
|
||||
conf_val = conf.value if hasattr(conf, "value") else str(conf) if conf else "E1"
|
||||
truth_val = truth.value if hasattr(truth, "value") else str(truth) if truth else "uploaded"
|
||||
|
||||
if CONFIDENCE_ORDER.get(conf_val, 1) >= CONFIDENCE_ORDER["E2"] and truth_val in VALID_TRUTH_STATUSES:
|
||||
has_qualifying = True
|
||||
break
|
||||
|
||||
if not has_qualifying:
|
||||
violations.append(
|
||||
"Transition from 'partial' to 'pass' requires at least 1 evidence with confidence >= E2 "
|
||||
"and truth_status in (uploaded, observed, validated_internal, accepted_by_auditor)."
|
||||
)
|
||||
return len(violations) == 0, violations
|
||||
|
||||
# All other transitions allowed
|
||||
return True, []
|
||||
Reference in New Issue
Block a user