Files
breakpilot-compliance/backend-compliance/tests/test_framework_decomposition.py
Benjamin Admin 48ca0a6bef feat: Framework Decomposition Engine + Composite Detection for Pass 0b
Adds a routing layer between Pass 0a and Pass 0b that classifies obligations
into atomic/compound/framework_container. Framework-container obligations
(e.g. "CCM-Praktiken fuer AIS") are decomposed into concrete sub-obligations
via an internal framework registry before Pass 0b composition.

- New: framework_decomposition.py with routing, matching, decomposition
- New: Framework registry (NIST SP 800-53, OWASP ASVS, CSA CCM) as JSON
- New: Composite detection flags on atomic controls (is_composite, atomicity)
- New: gen_meta fields: framework_ref, framework_domain, decomposition_source
- Integration: _route_and_compose() in run_pass0b() deterministic path
- 248 tests (198 decomposition + 50 framework), all passing

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-23 12:11:55 +01:00

454 lines
16 KiB
Python

"""Tests for Framework Decomposition Engine.
Covers:
- Registry loading
- Routing classification (atomic / compound / framework_container)
- Framework + domain matching
- Subcontrol selection
- Decomposition into sub-obligations
- Quality rules (warnings, errors)
- Inference helpers
"""
import pytest
from compliance.services.framework_decomposition import (
classify_routing,
decompose_framework_container,
get_registry,
registry_stats,
reload_registry,
DecomposedObligation,
FrameworkDecompositionResult,
RoutingResult,
_detect_framework,
_has_framework_keywords,
_infer_action,
_infer_object,
_is_compound_obligation,
_match_domain,
_select_subcontrols,
)
# ---------------------------------------------------------------------------
# REGISTRY TESTS
# ---------------------------------------------------------------------------
class TestRegistryLoading:
def test_registry_loads_successfully(self):
reg = get_registry()
assert len(reg) >= 3
def test_nist_in_registry(self):
reg = get_registry()
assert "NIST_SP800_53" in reg
def test_owasp_asvs_in_registry(self):
reg = get_registry()
assert "OWASP_ASVS" in reg
def test_csa_ccm_in_registry(self):
reg = get_registry()
assert "CSA_CCM" in reg
def test_nist_has_domains(self):
reg = get_registry()
nist = reg["NIST_SP800_53"]
assert len(nist["domains"]) >= 5
def test_nist_ac_has_subcontrols(self):
reg = get_registry()
nist = reg["NIST_SP800_53"]
ac = next(d for d in nist["domains"] if d["domain_id"] == "AC")
assert len(ac["subcontrols"]) >= 5
def test_registry_stats(self):
stats = registry_stats()
assert stats["frameworks"] >= 3
assert stats["total_domains"] >= 10
assert stats["total_subcontrols"] >= 30
def test_reload_registry(self):
reg = reload_registry()
assert len(reg) >= 3
# ---------------------------------------------------------------------------
# ROUTING TESTS
# ---------------------------------------------------------------------------
class TestClassifyRouting:
def test_atomic_simple_obligation(self):
result = classify_routing(
obligation_text="Multi-Faktor-Authentifizierung muss implementiert werden",
action_raw="implementieren",
object_raw="MFA",
)
assert result.routing_type == "atomic"
def test_framework_container_ccm_ais(self):
result = classify_routing(
obligation_text="Die CCM-Praktiken fuer Application and Interface Security (AIS) muessen implementiert werden",
action_raw="implementieren",
object_raw="CCM-Praktiken fuer AIS",
)
assert result.routing_type == "framework_container"
assert result.framework_ref == "CSA_CCM"
assert result.framework_domain == "AIS"
def test_framework_container_nist_800_53(self):
result = classify_routing(
obligation_text="Kontrollen gemaess NIST SP 800-53 umsetzen",
action_raw="umsetzen",
object_raw="Kontrollen gemaess NIST SP 800-53",
)
assert result.routing_type == "framework_container"
assert result.framework_ref == "NIST_SP800_53"
def test_framework_container_owasp_asvs(self):
result = classify_routing(
obligation_text="OWASP ASVS Anforderungen muessen implementiert werden",
action_raw="implementieren",
object_raw="OWASP ASVS Anforderungen",
)
assert result.routing_type == "framework_container"
assert result.framework_ref == "OWASP_ASVS"
def test_compound_obligation(self):
result = classify_routing(
obligation_text="Richtlinie erstellen und Schulungen durchfuehren",
action_raw="erstellen und durchfuehren",
object_raw="Richtlinie",
)
assert result.routing_type == "compound"
def test_no_split_phrase_not_compound(self):
result = classify_routing(
obligation_text="Richtlinie dokumentieren und pflegen",
action_raw="dokumentieren und pflegen",
object_raw="Richtlinie",
)
assert result.routing_type == "atomic"
def test_framework_keywords_in_object(self):
result = classify_routing(
obligation_text="Massnahmen umsetzen",
action_raw="umsetzen",
object_raw="Framework-Praktiken und Kontrollen",
)
assert result.routing_type == "framework_container"
def test_bsi_grundschutz_detected(self):
result = classify_routing(
obligation_text="BSI IT-Grundschutz Massnahmen umsetzen",
action_raw="umsetzen",
object_raw="BSI IT-Grundschutz Massnahmen",
)
assert result.routing_type == "framework_container"
# ---------------------------------------------------------------------------
# FRAMEWORK DETECTION TESTS
# ---------------------------------------------------------------------------
class TestFrameworkDetection:
def test_detect_csa_ccm_with_domain(self):
result = _detect_framework(
"CCM-Praktiken fuer AIS implementieren",
"CCM-Praktiken",
)
assert result.routing_type == "framework_container"
assert result.framework_ref == "CSA_CCM"
assert result.framework_domain == "AIS"
def test_detect_nist_without_domain(self):
result = _detect_framework(
"NIST SP 800-53 Kontrollen implementieren",
"Kontrollen",
)
assert result.routing_type == "framework_container"
assert result.framework_ref == "NIST_SP800_53"
def test_no_framework_in_simple_text(self):
result = _detect_framework(
"Passwortrichtlinie dokumentieren",
"Passwortrichtlinie",
)
assert result.routing_type == "atomic"
def test_csa_ccm_iam_domain(self):
result = _detect_framework(
"CSA CCM Identity and Access Management Kontrollen",
"IAM-Kontrollen",
)
assert result.routing_type == "framework_container"
assert result.framework_ref == "CSA_CCM"
assert result.framework_domain == "IAM"
# ---------------------------------------------------------------------------
# DOMAIN MATCHING TESTS
# ---------------------------------------------------------------------------
class TestDomainMatching:
def test_match_ais_by_id(self):
reg = get_registry()
ccm = reg["CSA_CCM"]
domain_id, title = _match_domain("AIS-Kontrollen implementieren", ccm)
assert domain_id == "AIS"
def test_match_by_full_title(self):
reg = get_registry()
ccm = reg["CSA_CCM"]
domain_id, title = _match_domain(
"Application and Interface Security Massnahmen", ccm,
)
assert domain_id == "AIS"
def test_match_nist_incident_response(self):
reg = get_registry()
nist = reg["NIST_SP800_53"]
domain_id, title = _match_domain(
"Vorfallreaktionsverfahren gemaess NIST IR", nist,
)
assert domain_id == "IR"
def test_no_match_generic_text(self):
reg = get_registry()
nist = reg["NIST_SP800_53"]
domain_id, title = _match_domain("etwas Allgemeines", nist)
assert domain_id is None
# ---------------------------------------------------------------------------
# SUBCONTROL SELECTION TESTS
# ---------------------------------------------------------------------------
class TestSubcontrolSelection:
def test_keyword_based_selection(self):
subcontrols = [
{"subcontrol_id": "SC-1", "title": "X", "keywords": ["api", "schnittstelle"], "object_hint": ""},
{"subcontrol_id": "SC-2", "title": "Y", "keywords": ["backup", "sicherung"], "object_hint": ""},
]
selected = _select_subcontrols("API-Schnittstellen schuetzen", subcontrols)
assert len(selected) == 1
assert selected[0]["subcontrol_id"] == "SC-1"
def test_no_keyword_match_returns_empty(self):
subcontrols = [
{"subcontrol_id": "SC-1", "keywords": ["backup"], "title": "Backup", "object_hint": ""},
]
selected = _select_subcontrols("Passwort aendern", subcontrols)
assert selected == []
def test_title_match_boosts_score(self):
subcontrols = [
{"subcontrol_id": "SC-1", "title": "Password Security", "keywords": ["passwort"], "object_hint": ""},
{"subcontrol_id": "SC-2", "title": "Network Security", "keywords": ["netzwerk"], "object_hint": ""},
]
selected = _select_subcontrols("Password Security muss implementiert werden", subcontrols)
assert len(selected) >= 1
assert selected[0]["subcontrol_id"] == "SC-1"
# ---------------------------------------------------------------------------
# DECOMPOSITION TESTS
# ---------------------------------------------------------------------------
class TestDecomposeFrameworkContainer:
def test_decompose_ccm_ais(self):
result = decompose_framework_container(
obligation_candidate_id="OBL-001",
parent_control_id="COMP-001",
obligation_text="Die CCM-Praktiken fuer AIS muessen implementiert werden",
framework_ref="CSA_CCM",
framework_domain="AIS",
)
assert result.release_state == "decomposed"
assert result.framework_ref == "CSA_CCM"
assert result.framework_domain == "AIS"
assert len(result.decomposed_obligations) >= 3
assert len(result.matched_subcontrols) >= 3
def test_decomposed_obligations_have_ids(self):
result = decompose_framework_container(
obligation_candidate_id="OBL-001",
parent_control_id="COMP-001",
obligation_text="CCM-Praktiken fuer AIS",
framework_ref="CSA_CCM",
framework_domain="AIS",
)
for d in result.decomposed_obligations:
assert d.obligation_candidate_id.startswith("OBL-001-AIS-")
assert d.parent_control_id == "COMP-001"
assert d.source_ref_law == "Cloud Security Alliance CCM v4"
assert d.routing_type == "atomic"
assert d.release_state == "decomposed"
def test_decomposed_have_action_and_object(self):
result = decompose_framework_container(
obligation_candidate_id="OBL-002",
parent_control_id="COMP-002",
obligation_text="CSA CCM AIS Massnahmen implementieren",
framework_ref="CSA_CCM",
framework_domain="AIS",
)
for d in result.decomposed_obligations:
assert d.action_raw, f"{d.subcontrol_id} missing action_raw"
assert d.object_raw, f"{d.subcontrol_id} missing object_raw"
def test_unknown_framework_returns_unmatched(self):
result = decompose_framework_container(
obligation_candidate_id="OBL-003",
parent_control_id="COMP-003",
obligation_text="XYZ-Framework Controls",
framework_ref="NONEXISTENT",
framework_domain="ABC",
)
assert result.release_state == "unmatched"
assert any("framework_not_matched" in i for i in result.issues)
assert len(result.decomposed_obligations) == 0
def test_unknown_domain_falls_back_to_full(self):
result = decompose_framework_container(
obligation_candidate_id="OBL-004",
parent_control_id="COMP-004",
obligation_text="CSA CCM Kontrollen implementieren",
framework_ref="CSA_CCM",
framework_domain=None,
)
# Should still decompose (falls back to keyword match or all domains)
assert result.release_state in ("decomposed", "unmatched")
def test_nist_incident_response_decomposition(self):
result = decompose_framework_container(
obligation_candidate_id="OBL-010",
parent_control_id="COMP-010",
obligation_text="NIST SP 800-53 Vorfallreaktionsmassnahmen implementieren",
framework_ref="NIST_SP800_53",
framework_domain="IR",
)
assert result.release_state == "decomposed"
assert len(result.decomposed_obligations) >= 3
sc_ids = [d.subcontrol_id for d in result.decomposed_obligations]
assert any("IR-" in sc for sc in sc_ids)
def test_confidence_high_with_full_match(self):
result = decompose_framework_container(
obligation_candidate_id="OBL-005",
parent_control_id="COMP-005",
obligation_text="CSA CCM AIS",
framework_ref="CSA_CCM",
framework_domain="AIS",
)
assert result.decomposition_confidence >= 0.7
def test_confidence_low_without_framework(self):
result = decompose_framework_container(
obligation_candidate_id="OBL-006",
parent_control_id="COMP-006",
obligation_text="Unbekannte Massnahmen",
framework_ref=None,
framework_domain=None,
)
assert result.decomposition_confidence <= 0.3
# ---------------------------------------------------------------------------
# COMPOUND DETECTION TESTS
# ---------------------------------------------------------------------------
class TestCompoundDetection:
def test_compound_verb(self):
assert _is_compound_obligation(
"erstellen und schulen",
"Richtlinie erstellen und Schulungen durchfuehren",
)
def test_no_split_phrase(self):
assert not _is_compound_obligation(
"dokumentieren und pflegen",
"Richtlinie dokumentieren und pflegen",
)
def test_no_split_define_and_maintain(self):
assert not _is_compound_obligation(
"define and maintain",
"Define and maintain a security policy",
)
def test_single_verb_not_compound(self):
assert not _is_compound_obligation(
"implementieren",
"MFA implementieren",
)
def test_empty_action_not_compound(self):
assert not _is_compound_obligation("", "something")
# ---------------------------------------------------------------------------
# FRAMEWORK KEYWORD TESTS
# ---------------------------------------------------------------------------
class TestFrameworkKeywords:
def test_two_keywords_detected(self):
assert _has_framework_keywords("Framework-Praktiken implementieren")
def test_single_keyword_not_enough(self):
assert not _has_framework_keywords("Praktiken implementieren")
def test_no_keywords(self):
assert not _has_framework_keywords("MFA einrichten")
# ---------------------------------------------------------------------------
# INFERENCE HELPER TESTS
# ---------------------------------------------------------------------------
class TestInferAction:
def test_infer_implementieren(self):
assert _infer_action("Massnahmen muessen implementiert werden") == "implementieren"
def test_infer_dokumentieren(self):
assert _infer_action("Richtlinie muss dokumentiert werden") == "dokumentieren"
def test_infer_testen(self):
assert _infer_action("System wird getestet") == "testen"
def test_infer_ueberwachen(self):
assert _infer_action("Logs werden ueberwacht") == "ueberwachen"
def test_infer_default(self):
assert _infer_action("etwas passiert") == "implementieren"
class TestInferObject:
def test_infer_from_muessen_pattern(self):
result = _infer_object("Zugriffsrechte muessen ueberprueft werden")
assert "ueberprueft" in result or "Zugriffsrechte" in result
def test_infer_fallback(self):
result = _infer_object("Einfacher Satz ohne Modalverb")
assert len(result) > 0