diff --git a/backend-compliance/compliance/services/decomposition_pass.py b/backend-compliance/compliance/services/decomposition_pass.py index 975ab3d..3672159 100644 --- a/backend-compliance/compliance/services/decomposition_pass.py +++ b/backend-compliance/compliance/services/decomposition_pass.py @@ -1418,6 +1418,18 @@ _OBJECT_SYNONYMS: dict[str, str] = { } +def _truncate_title(title: str, max_len: int = 80) -> str: + """Truncate title at word boundary to avoid mid-word cuts.""" + if len(title) <= max_len: + return title + truncated = title[:max_len] + # Cut at last space to avoid mid-word truncation + last_space = truncated.rfind(" ") + if last_space > max_len // 2: + return truncated[:last_space] + return truncated + + def _normalize_object(object_raw: str) -> str: """Normalize object text to a snake_case key for merge hints. @@ -1613,11 +1625,11 @@ def _compose_deterministic( # ── Title: "{Object} {Zustand}" ─────────────────────────── state = _ACTION_STATE_SUFFIX.get(action_type, "umgesetzt") if object_: - title = f"{object_.strip()} {state}"[:80] + title = _truncate_title(f"{object_.strip()} {state}") elif action: - title = f"{action.strip().capitalize()} {state}"[:80] + title = _truncate_title(f"{action.strip().capitalize()} {state}") else: - title = f"{parent_title} {state}"[:80] + title = _truncate_title(f"{parent_title} {state}") # ── Objective = obligation text (the normative statement) ─ objective = obligation_text.strip()[:2000] @@ -1678,7 +1690,7 @@ def _compose_deterministic( requirements=requirements, test_procedure=test_procedure, evidence=evidence, - severity=_normalize_severity(parent_severity), + severity=_calibrate_severity(parent_severity, action_type), category=parent_category or "governance", ) # Attach extra metadata (stored in generation_metadata) @@ -2877,10 +2889,31 @@ class DecompositionPass: """Insert an atomic control and create parent link. Returns the UUID of the newly created control, or None on failure. + Checks merge_hint to prevent duplicate controls under the same parent. """ parent_uuid = obl["parent_uuid"] candidate_id = obl["candidate_id"] + # ── Duplicate Guard: skip if same merge_hint already exists ── + merge_hint = getattr(atomic, "source_regulation", "") or "" + if merge_hint: + existing = self.db.execute( + text(""" + SELECT id::text FROM canonical_controls + WHERE parent_control_uuid = CAST(:parent AS uuid) + AND generation_metadata->>'merge_group_hint' = :hint + AND release_state != 'rejected' + LIMIT 1 + """), + {"parent": parent_uuid, "hint": merge_hint}, + ).fetchone() + if existing: + logger.debug( + "Duplicate guard: skipping %s — merge_hint %s already exists as %s", + candidate_id, merge_hint, existing[0], + ) + return existing[0] + result = self.db.execute( text(""" INSERT INTO canonical_controls ( @@ -3475,4 +3508,39 @@ def _normalize_severity(val: str) -> str: return "medium" +# Action-type-based severity calibration: not every atomic control +# inherits the parent's severity. Definition and review controls are +# typically medium, while implementation controls stay high. +_ACTION_SEVERITY_CAP: dict[str, str] = { + "define": "medium", + "review": "medium", + "document": "medium", + "report": "medium", + "test": "medium", + "implement": "high", + "configure": "high", + "monitor": "high", + "enforce": "high", +} + +# Severity ordering for cap comparison +_SEVERITY_ORDER = {"low": 0, "medium": 1, "high": 2, "critical": 3} + + +def _calibrate_severity(parent_severity: str, action_type: str) -> str: + """Calibrate severity based on action type. + + Implementation/enforcement inherits parent severity. + Definition/review/test/documentation caps at medium. + """ + parent = _normalize_severity(parent_severity) + cap = _ACTION_SEVERITY_CAP.get(action_type) + if not cap: + return parent + # Return the lower of parent severity and action-type cap + if _SEVERITY_ORDER.get(parent, 1) <= _SEVERITY_ORDER.get(cap, 1): + return parent + return cap + + # _template_fallback removed — replaced by _compose_deterministic engine diff --git a/backend-compliance/tests/test_decomposition_pass.py b/backend-compliance/tests/test_decomposition_pass.py index 53b65a9..8788f18 100644 --- a/backend-compliance/tests/test_decomposition_pass.py +++ b/backend-compliance/tests/test_decomposition_pass.py @@ -40,6 +40,8 @@ from compliance.services.decomposition_pass import ( _format_citation, _compute_extraction_confidence, _normalize_severity, + _calibrate_severity, + _truncate_title, _compose_deterministic, _classify_action, _classify_object, @@ -704,7 +706,8 @@ class TestComposeDeterministic: # Object placeholder should use parent_title assert "System Security" in ac.test_procedure[0] - def test_severity_inherited(self): + def test_severity_calibrated(self): + # implement caps at high — critical is reserved for parent-level controls ac = _compose_deterministic( obligation_text="Kritische Pflicht", action="implementieren", @@ -715,7 +718,7 @@ class TestComposeDeterministic: is_test=False, is_reporting=False, ) - assert ac.severity == "critical" + assert ac.severity == "high" def test_category_inherited(self): ac = _compose_deterministic( @@ -2431,3 +2434,113 @@ class TestPass0bWithEnrichment: # Invalid JSON assert _parse_citation("not json") == {} + + +# --------------------------------------------------------------------------- +# TRUNCATE TITLE TESTS +# --------------------------------------------------------------------------- + +class TestTruncateTitle: + """Tests for _truncate_title — word-boundary truncation.""" + + def test_short_title_unchanged(self): + assert _truncate_title("Rate-Limiting umgesetzt") == "Rate-Limiting umgesetzt" + + def test_exactly_80_unchanged(self): + title = "A" * 80 + assert _truncate_title(title) == title + + def test_long_title_cuts_at_word_boundary(self): + title = "Maximale Payload-Groessen fuer API-Anfragen und API-Antworten definiert und technisch durchgesetzt" + result = _truncate_title(title) + assert len(result) <= 80 + assert not result.endswith(" ") + # Should not cut mid-word + assert result[-1].isalpha() or result[-1] in ("-", ")") + + def test_no_mid_word_cut(self): + # "definieren" would be cut to "defin" with naive [:80] + title = "x" * 75 + " definieren" + result = _truncate_title(title) + assert "defin" not in result or "definieren" in result + + def test_custom_max_len(self): + result = _truncate_title("Rate-Limiting fuer alle Endpunkte", max_len=20) + assert len(result) <= 20 + + +# --------------------------------------------------------------------------- +# SEVERITY CALIBRATION TESTS +# --------------------------------------------------------------------------- + +class TestCalibrateSeverity: + """Tests for _calibrate_severity — action-type-based severity.""" + + def test_implement_keeps_high(self): + assert _calibrate_severity("high", "implement") == "high" + + def test_define_caps_to_medium(self): + assert _calibrate_severity("high", "define") == "medium" + + def test_review_caps_to_medium(self): + assert _calibrate_severity("high", "review") == "medium" + + def test_test_caps_to_medium(self): + assert _calibrate_severity("high", "test") == "medium" + + def test_document_caps_to_medium(self): + assert _calibrate_severity("high", "document") == "medium" + + def test_monitor_keeps_high(self): + assert _calibrate_severity("high", "monitor") == "high" + + def test_low_parent_stays_low(self): + # Even for implement, if parent is low, stays low + assert _calibrate_severity("low", "implement") == "low" + + def test_medium_parent_define_stays_medium(self): + assert _calibrate_severity("medium", "define") == "medium" + + def test_unknown_action_inherits_parent(self): + assert _calibrate_severity("high", "unknown_action") == "high" + + def test_critical_implement_caps_to_high(self): + # implement caps at high — critical is reserved for parent-level controls + assert _calibrate_severity("critical", "implement") == "high" + + def test_critical_define_caps_to_medium(self): + assert _calibrate_severity("critical", "define") == "medium" + + +# --------------------------------------------------------------------------- +# COMPOSE DETERMINISTIC — SEVERITY CALIBRATION INTEGRATION +# --------------------------------------------------------------------------- + +class TestComposeDeterministicSeverity: + """Verify _compose_deterministic uses calibrated severity.""" + + def test_define_action_gets_medium(self): + atomic = _compose_deterministic( + obligation_text="Payload-Grenzen sind verbindlich festzulegen.", + action="definieren", + object_="Payload-Grenzen", + parent_title="API Ressourcen", + parent_severity="high", + parent_category="security", + is_test=False, + is_reporting=False, + ) + assert atomic.severity == "medium" + + def test_implement_action_keeps_high(self): + atomic = _compose_deterministic( + obligation_text="Rate-Limiting muss technisch umgesetzt werden.", + action="implementieren", + object_="Rate-Limiting", + parent_title="API Ressourcen", + parent_severity="high", + parent_category="security", + is_test=False, + is_reporting=False, + ) + assert atomic.severity == "high"