fix: Pass 0b — Duplicate Guard, Severity-Kalibrierung, Title-Truncation
All checks were successful
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 55s
CI/CD / test-python-backend-compliance (push) Successful in 36s
CI/CD / test-python-document-crawler (push) Successful in 23s
CI/CD / test-python-dsms-gateway (push) Successful in 20s
CI/CD / validate-canonical-controls (push) Successful in 11s
CI/CD / Deploy (push) Successful in 4s

1. Duplicate Guard: merge_hint-Lookup vor INSERT in _write_atomic_control()
   verhindert semantisch identische Controls unter demselben Parent.
2. Severity-Kalibrierung: action_type-basiert statt blind vom Parent.
   define/review/test → max medium, implement/monitor → max high.
3. Title-Truncation: Schnitt am Wortende statt mitten im Wort.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-27 08:38:33 +01:00
parent f39e5a71af
commit fb2cf29b34
2 changed files with 187 additions and 6 deletions

View File

@@ -1418,6 +1418,18 @@ _OBJECT_SYNONYMS: dict[str, str] = {
} }
def _truncate_title(title: str, max_len: int = 80) -> str:
"""Truncate title at word boundary to avoid mid-word cuts."""
if len(title) <= max_len:
return title
truncated = title[:max_len]
# Cut at last space to avoid mid-word truncation
last_space = truncated.rfind(" ")
if last_space > max_len // 2:
return truncated[:last_space]
return truncated
def _normalize_object(object_raw: str) -> str: def _normalize_object(object_raw: str) -> str:
"""Normalize object text to a snake_case key for merge hints. """Normalize object text to a snake_case key for merge hints.
@@ -1613,11 +1625,11 @@ def _compose_deterministic(
# ── Title: "{Object} {Zustand}" ─────────────────────────── # ── Title: "{Object} {Zustand}" ───────────────────────────
state = _ACTION_STATE_SUFFIX.get(action_type, "umgesetzt") state = _ACTION_STATE_SUFFIX.get(action_type, "umgesetzt")
if object_: if object_:
title = f"{object_.strip()} {state}"[:80] title = _truncate_title(f"{object_.strip()} {state}")
elif action: elif action:
title = f"{action.strip().capitalize()} {state}"[:80] title = _truncate_title(f"{action.strip().capitalize()} {state}")
else: else:
title = f"{parent_title} {state}"[:80] title = _truncate_title(f"{parent_title} {state}")
# ── Objective = obligation text (the normative statement) ─ # ── Objective = obligation text (the normative statement) ─
objective = obligation_text.strip()[:2000] objective = obligation_text.strip()[:2000]
@@ -1678,7 +1690,7 @@ def _compose_deterministic(
requirements=requirements, requirements=requirements,
test_procedure=test_procedure, test_procedure=test_procedure,
evidence=evidence, evidence=evidence,
severity=_normalize_severity(parent_severity), severity=_calibrate_severity(parent_severity, action_type),
category=parent_category or "governance", category=parent_category or "governance",
) )
# Attach extra metadata (stored in generation_metadata) # Attach extra metadata (stored in generation_metadata)
@@ -2877,10 +2889,31 @@ class DecompositionPass:
"""Insert an atomic control and create parent link. """Insert an atomic control and create parent link.
Returns the UUID of the newly created control, or None on failure. Returns the UUID of the newly created control, or None on failure.
Checks merge_hint to prevent duplicate controls under the same parent.
""" """
parent_uuid = obl["parent_uuid"] parent_uuid = obl["parent_uuid"]
candidate_id = obl["candidate_id"] candidate_id = obl["candidate_id"]
# ── Duplicate Guard: skip if same merge_hint already exists ──
merge_hint = getattr(atomic, "source_regulation", "") or ""
if merge_hint:
existing = self.db.execute(
text("""
SELECT id::text FROM canonical_controls
WHERE parent_control_uuid = CAST(:parent AS uuid)
AND generation_metadata->>'merge_group_hint' = :hint
AND release_state != 'rejected'
LIMIT 1
"""),
{"parent": parent_uuid, "hint": merge_hint},
).fetchone()
if existing:
logger.debug(
"Duplicate guard: skipping %s — merge_hint %s already exists as %s",
candidate_id, merge_hint, existing[0],
)
return existing[0]
result = self.db.execute( result = self.db.execute(
text(""" text("""
INSERT INTO canonical_controls ( INSERT INTO canonical_controls (
@@ -3475,4 +3508,39 @@ def _normalize_severity(val: str) -> str:
return "medium" return "medium"
# Action-type-based severity calibration: not every atomic control
# inherits the parent's severity. Definition and review controls are
# typically medium, while implementation controls stay high.
_ACTION_SEVERITY_CAP: dict[str, str] = {
"define": "medium",
"review": "medium",
"document": "medium",
"report": "medium",
"test": "medium",
"implement": "high",
"configure": "high",
"monitor": "high",
"enforce": "high",
}
# Severity ordering for cap comparison
_SEVERITY_ORDER = {"low": 0, "medium": 1, "high": 2, "critical": 3}
def _calibrate_severity(parent_severity: str, action_type: str) -> str:
"""Calibrate severity based on action type.
Implementation/enforcement inherits parent severity.
Definition/review/test/documentation caps at medium.
"""
parent = _normalize_severity(parent_severity)
cap = _ACTION_SEVERITY_CAP.get(action_type)
if not cap:
return parent
# Return the lower of parent severity and action-type cap
if _SEVERITY_ORDER.get(parent, 1) <= _SEVERITY_ORDER.get(cap, 1):
return parent
return cap
# _template_fallback removed — replaced by _compose_deterministic engine # _template_fallback removed — replaced by _compose_deterministic engine

View File

@@ -40,6 +40,8 @@ from compliance.services.decomposition_pass import (
_format_citation, _format_citation,
_compute_extraction_confidence, _compute_extraction_confidence,
_normalize_severity, _normalize_severity,
_calibrate_severity,
_truncate_title,
_compose_deterministic, _compose_deterministic,
_classify_action, _classify_action,
_classify_object, _classify_object,
@@ -704,7 +706,8 @@ class TestComposeDeterministic:
# Object placeholder should use parent_title # Object placeholder should use parent_title
assert "System Security" in ac.test_procedure[0] assert "System Security" in ac.test_procedure[0]
def test_severity_inherited(self): def test_severity_calibrated(self):
# implement caps at high — critical is reserved for parent-level controls
ac = _compose_deterministic( ac = _compose_deterministic(
obligation_text="Kritische Pflicht", obligation_text="Kritische Pflicht",
action="implementieren", action="implementieren",
@@ -715,7 +718,7 @@ class TestComposeDeterministic:
is_test=False, is_test=False,
is_reporting=False, is_reporting=False,
) )
assert ac.severity == "critical" assert ac.severity == "high"
def test_category_inherited(self): def test_category_inherited(self):
ac = _compose_deterministic( ac = _compose_deterministic(
@@ -2431,3 +2434,113 @@ class TestPass0bWithEnrichment:
# Invalid JSON # Invalid JSON
assert _parse_citation("not json") == {} assert _parse_citation("not json") == {}
# ---------------------------------------------------------------------------
# TRUNCATE TITLE TESTS
# ---------------------------------------------------------------------------
class TestTruncateTitle:
"""Tests for _truncate_title — word-boundary truncation."""
def test_short_title_unchanged(self):
assert _truncate_title("Rate-Limiting umgesetzt") == "Rate-Limiting umgesetzt"
def test_exactly_80_unchanged(self):
title = "A" * 80
assert _truncate_title(title) == title
def test_long_title_cuts_at_word_boundary(self):
title = "Maximale Payload-Groessen fuer API-Anfragen und API-Antworten definiert und technisch durchgesetzt"
result = _truncate_title(title)
assert len(result) <= 80
assert not result.endswith(" ")
# Should not cut mid-word
assert result[-1].isalpha() or result[-1] in ("-", ")")
def test_no_mid_word_cut(self):
# "definieren" would be cut to "defin" with naive [:80]
title = "x" * 75 + " definieren"
result = _truncate_title(title)
assert "defin" not in result or "definieren" in result
def test_custom_max_len(self):
result = _truncate_title("Rate-Limiting fuer alle Endpunkte", max_len=20)
assert len(result) <= 20
# ---------------------------------------------------------------------------
# SEVERITY CALIBRATION TESTS
# ---------------------------------------------------------------------------
class TestCalibrateSeverity:
"""Tests for _calibrate_severity — action-type-based severity."""
def test_implement_keeps_high(self):
assert _calibrate_severity("high", "implement") == "high"
def test_define_caps_to_medium(self):
assert _calibrate_severity("high", "define") == "medium"
def test_review_caps_to_medium(self):
assert _calibrate_severity("high", "review") == "medium"
def test_test_caps_to_medium(self):
assert _calibrate_severity("high", "test") == "medium"
def test_document_caps_to_medium(self):
assert _calibrate_severity("high", "document") == "medium"
def test_monitor_keeps_high(self):
assert _calibrate_severity("high", "monitor") == "high"
def test_low_parent_stays_low(self):
# Even for implement, if parent is low, stays low
assert _calibrate_severity("low", "implement") == "low"
def test_medium_parent_define_stays_medium(self):
assert _calibrate_severity("medium", "define") == "medium"
def test_unknown_action_inherits_parent(self):
assert _calibrate_severity("high", "unknown_action") == "high"
def test_critical_implement_caps_to_high(self):
# implement caps at high — critical is reserved for parent-level controls
assert _calibrate_severity("critical", "implement") == "high"
def test_critical_define_caps_to_medium(self):
assert _calibrate_severity("critical", "define") == "medium"
# ---------------------------------------------------------------------------
# COMPOSE DETERMINISTIC — SEVERITY CALIBRATION INTEGRATION
# ---------------------------------------------------------------------------
class TestComposeDeterministicSeverity:
"""Verify _compose_deterministic uses calibrated severity."""
def test_define_action_gets_medium(self):
atomic = _compose_deterministic(
obligation_text="Payload-Grenzen sind verbindlich festzulegen.",
action="definieren",
object_="Payload-Grenzen",
parent_title="API Ressourcen",
parent_severity="high",
parent_category="security",
is_test=False,
is_reporting=False,
)
assert atomic.severity == "medium"
def test_implement_action_keeps_high(self):
atomic = _compose_deterministic(
obligation_text="Rate-Limiting muss technisch umgesetzt werden.",
action="implementieren",
object_="Rate-Limiting",
parent_title="API Ressourcen",
parent_severity="high",
parent_category="security",
is_test=False,
is_reporting=False,
)
assert atomic.severity == "high"