"""Tests for Control Composer — Phase 6 of Multi-Layer Control Architecture. Validates: - ComposedControl dataclass and serialization - Pattern-guided composition (Tier 1) - Template-only fallback (when LLM fails) - Fallback composition (no pattern) - License rule handling (Rules 1, 2, 3) - Prompt building - Field validation and fixing - Batch composition - Edge cases: empty inputs, missing data, malformed LLM responses """ import json from unittest.mock import AsyncMock, patch import pytest from compliance.services.control_composer import ( ComposedControl, ControlComposer, _anchors_from_pattern, _build_compose_prompt, _build_fallback_prompt, _compose_system_prompt, _ensure_list, _obligation_section, _pattern_section, _severity_to_risk, _validate_control, ) from compliance.services.obligation_extractor import ObligationMatch from compliance.services.pattern_matcher import ControlPattern, PatternMatchResult # ============================================================================= # Fixtures # ============================================================================= def _make_obligation( obligation_id="DSGVO-OBL-001", title="Verarbeitungsverzeichnis fuehren", text="Fuehrung eines Verzeichnisses aller Verarbeitungstaetigkeiten.", method="exact_match", confidence=1.0, regulation_id="dsgvo", ) -> ObligationMatch: return ObligationMatch( obligation_id=obligation_id, obligation_title=title, obligation_text=text, method=method, confidence=confidence, regulation_id=regulation_id, ) def _make_pattern( pattern_id="CP-COMP-001", name="compliance_governance", name_de="Compliance-Governance", domain="COMP", category="compliance", ) -> ControlPattern: return ControlPattern( id=pattern_id, name=name, name_de=name_de, domain=domain, category=category, description="Compliance management and governance framework", objective_template="Sicherstellen, dass ein wirksames Compliance-Management existiert.", rationale_template="Ohne Governance fehlt die Grundlage fuer Compliance.", requirements_template=[ "Compliance-Verantwortlichkeiten definieren", "Regelmaessige Compliance-Bewertungen durchfuehren", "Dokumentationspflichten einhalten", ], test_procedure_template=[ "Pruefung der Compliance-Organisation", "Stichproben der Dokumentation", ], evidence_template=[ "Compliance-Handbuch", "Pruefberichte", ], severity_default="high", implementation_effort_default="l", obligation_match_keywords=["compliance", "governance", "konformitaet"], tags=["compliance", "governance"], composable_with=["CP-COMP-002"], open_anchor_refs=[ {"framework": "ISO 27001", "ref": "A.18"}, {"framework": "NIST CSF", "ref": "GV.OC"}, ], ) def _make_pattern_result(pattern=None, confidence=0.85, method="keyword") -> PatternMatchResult: if pattern is None: pattern = _make_pattern() return PatternMatchResult( pattern=pattern, pattern_id=pattern.id, method=method, confidence=confidence, keyword_hits=4, total_keywords=7, ) def _llm_success_response() -> str: return json.dumps({ "title": "Compliance-Governance fuer Verarbeitungstaetigkeiten", "objective": "Sicherstellen, dass alle Verarbeitungstaetigkeiten dokumentiert und ueberwacht werden.", "rationale": "Die DSGVO verlangt ein Verarbeitungsverzeichnis als Grundlage der Rechenschaftspflicht.", "requirements": [ "Verarbeitungsverzeichnis gemaess Art. 30 DSGVO fuehren", "Regelmaessige Aktualisierung bei Aenderungen", "Verantwortlichkeiten fuer die Pflege zuweisen", ], "test_procedure": [ "Vollstaendigkeit des Verzeichnisses pruefen", "Aktualitaet der Eintraege verifizieren", ], "evidence": [ "Verarbeitungsverzeichnis", "Aenderungsprotokoll", ], "severity": "high", "implementation_effort": "m", "category": "compliance", "tags": ["dsgvo", "verarbeitungsverzeichnis", "governance"], "target_audience": ["unternehmen", "behoerden"], "verification_method": "document", }) # ============================================================================= # Tests: ComposedControl # ============================================================================= class TestComposedControl: """Tests for the ComposedControl dataclass.""" def test_defaults(self): c = ComposedControl() assert c.control_id == "" assert c.title == "" assert c.severity == "medium" assert c.risk_score == 5.0 assert c.implementation_effort == "m" assert c.release_state == "draft" assert c.license_rule is None assert c.customer_visible is True assert c.pattern_id is None assert c.obligation_ids == [] assert c.composition_method == "pattern_guided" def test_to_dict_keys(self): c = ComposedControl() d = c.to_dict() expected_keys = { "control_id", "title", "objective", "rationale", "scope", "requirements", "test_procedure", "evidence", "severity", "risk_score", "implementation_effort", "open_anchors", "release_state", "tags", "license_rule", "source_original_text", "source_citation", "customer_visible", "verification_method", "category", "target_audience", "pattern_id", "obligation_ids", "generation_metadata", "composition_method", } assert set(d.keys()) == expected_keys def test_to_dict_values(self): c = ComposedControl( title="Test Control", pattern_id="CP-AUTH-001", obligation_ids=["DSGVO-OBL-001"], severity="high", license_rule=1, ) d = c.to_dict() assert d["title"] == "Test Control" assert d["pattern_id"] == "CP-AUTH-001" assert d["obligation_ids"] == ["DSGVO-OBL-001"] assert d["severity"] == "high" assert d["license_rule"] == 1 # ============================================================================= # Tests: _ensure_list # ============================================================================= class TestEnsureList: def test_list_passthrough(self): assert _ensure_list(["a", "b"]) == ["a", "b"] def test_string_to_list(self): assert _ensure_list("hello") == ["hello"] def test_none_to_empty(self): assert _ensure_list(None) == [] def test_empty_list(self): assert _ensure_list([]) == [] def test_filters_empty_values(self): assert _ensure_list(["a", "", "b"]) == ["a", "b"] def test_converts_to_strings(self): assert _ensure_list([1, 2, 3]) == ["1", "2", "3"] # ============================================================================= # Tests: _anchors_from_pattern # ============================================================================= class TestAnchorsFromPattern: def test_converts_anchors(self): pattern = _make_pattern() anchors = _anchors_from_pattern(pattern) assert len(anchors) == 2 assert anchors[0]["framework"] == "ISO 27001" assert anchors[0]["control_id"] == "A.18" assert anchors[0]["alignment_score"] == 0.8 def test_empty_anchors(self): pattern = _make_pattern() pattern.open_anchor_refs = [] anchors = _anchors_from_pattern(pattern) assert anchors == [] # ============================================================================= # Tests: _severity_to_risk # ============================================================================= class TestSeverityToRisk: def test_critical(self): assert _severity_to_risk("critical") == 9.0 def test_high(self): assert _severity_to_risk("high") == 7.0 def test_medium(self): assert _severity_to_risk("medium") == 5.0 def test_low(self): assert _severity_to_risk("low") == 3.0 def test_unknown(self): assert _severity_to_risk("xyz") == 5.0 # ============================================================================= # Tests: _validate_control # ============================================================================= class TestValidateControl: def test_fixes_invalid_severity(self): c = ComposedControl(severity="extreme") _validate_control(c) assert c.severity == "medium" def test_keeps_valid_severity(self): c = ComposedControl(severity="critical") _validate_control(c) assert c.severity == "critical" def test_fixes_invalid_effort(self): c = ComposedControl(implementation_effort="xxl") _validate_control(c) assert c.implementation_effort == "m" def test_fixes_invalid_verification(self): c = ComposedControl(verification_method="magic") _validate_control(c) assert c.verification_method is None def test_keeps_valid_verification(self): c = ComposedControl(verification_method="code_review") _validate_control(c) assert c.verification_method == "code_review" def test_fixes_risk_score_out_of_range(self): c = ComposedControl(risk_score=15.0, severity="high") _validate_control(c) assert c.risk_score == 7.0 # from severity def test_truncates_long_title(self): c = ComposedControl(title="A" * 300) _validate_control(c) assert len(c.title) <= 255 def test_ensures_minimum_content(self): c = ComposedControl( title="Test", objective="", rationale="", requirements=[], test_procedure=[], evidence=[], ) _validate_control(c) assert c.objective == "Test" # falls back to title assert c.rationale != "" assert len(c.requirements) >= 1 assert len(c.test_procedure) >= 1 assert len(c.evidence) >= 1 # ============================================================================= # Tests: Prompt builders # ============================================================================= class TestPromptBuilders: def test_compose_system_prompt_rule1(self): prompt = _compose_system_prompt(1) assert "praxisorientiertes" in prompt assert "KOPIERE KEINE" not in prompt def test_compose_system_prompt_rule3(self): prompt = _compose_system_prompt(3) assert "KOPIERE KEINE" in prompt assert "NENNE NICHT die Quelle" in prompt def test_obligation_section_full(self): obl = _make_obligation() section = _obligation_section(obl) assert "PFLICHT" in section assert "Verarbeitungsverzeichnis" in section assert "DSGVO-OBL-001" in section assert "dsgvo" in section def test_obligation_section_minimal(self): obl = ObligationMatch() section = _obligation_section(obl) assert "Keine spezifische Pflicht" in section def test_pattern_section(self): pattern = _make_pattern() section = _pattern_section(pattern) assert "MUSTER" in section assert "Compliance-Governance" in section assert "CP-COMP-001" in section assert "Compliance-Verantwortlichkeiten" in section def test_build_compose_prompt_rule1(self): obl = _make_obligation() pattern = _make_pattern() prompt = _build_compose_prompt(obl, pattern, "Original text here", 1) assert "PFLICHT" in prompt assert "MUSTER" in prompt assert "KONTEXT (Originaltext)" in prompt assert "Original text here" in prompt def test_build_compose_prompt_rule3(self): obl = _make_obligation() pattern = _make_pattern() prompt = _build_compose_prompt(obl, pattern, "Secret text", 3) assert "Intern analysiert" in prompt assert "Secret text" not in prompt def test_build_fallback_prompt(self): obl = _make_obligation() prompt = _build_fallback_prompt(obl, "Chunk text", 1) assert "PFLICHT" in prompt assert "KONTEXT (Originaltext)" in prompt def test_build_fallback_prompt_no_chunk(self): obl = _make_obligation() prompt = _build_fallback_prompt(obl, None, 1) assert "Kein Originaltext" in prompt # ============================================================================= # Tests: ControlComposer — Pattern-guided composition # ============================================================================= class TestComposeWithPattern: """Tests for pattern-guided control composition.""" def setup_method(self): self.composer = ControlComposer() self.obligation = _make_obligation() self.pattern_result = _make_pattern_result() @pytest.mark.asyncio async def test_compose_success_rule1(self): """Successful LLM composition with Rule 1.""" with patch( "compliance.services.control_composer._llm_ollama", new_callable=AsyncMock, return_value=_llm_success_response(), ): control = await self.composer.compose( obligation=self.obligation, pattern_result=self.pattern_result, chunk_text="Der Verantwortliche fuehrt ein Verzeichnis...", license_rule=1, ) assert control.composition_method == "pattern_guided" assert control.title != "" assert "Verarbeitungstaetigkeiten" in control.objective assert len(control.requirements) >= 2 assert len(control.test_procedure) >= 1 assert len(control.evidence) >= 1 assert control.severity == "high" assert control.category == "compliance" @pytest.mark.asyncio async def test_compose_sets_linkage(self): """Pattern and obligation IDs should be set.""" with patch( "compliance.services.control_composer._llm_ollama", new_callable=AsyncMock, return_value=_llm_success_response(), ): control = await self.composer.compose( obligation=self.obligation, pattern_result=self.pattern_result, license_rule=1, ) assert control.pattern_id == "CP-COMP-001" assert control.obligation_ids == ["DSGVO-OBL-001"] @pytest.mark.asyncio async def test_compose_sets_metadata(self): """Generation metadata should include composition details.""" with patch( "compliance.services.control_composer._llm_ollama", new_callable=AsyncMock, return_value=_llm_success_response(), ): control = await self.composer.compose( obligation=self.obligation, pattern_result=self.pattern_result, license_rule=1, regulation_code="eu_2016_679", ) meta = control.generation_metadata assert meta["composition_method"] == "pattern_guided" assert meta["pattern_id"] == "CP-COMP-001" assert meta["pattern_confidence"] == 0.85 assert meta["obligation_id"] == "DSGVO-OBL-001" assert meta["license_rule"] == 1 assert meta["regulation_code"] == "eu_2016_679" @pytest.mark.asyncio async def test_compose_rule1_stores_original(self): """Rule 1: original text should be stored.""" with patch( "compliance.services.control_composer._llm_ollama", new_callable=AsyncMock, return_value=_llm_success_response(), ): control = await self.composer.compose( obligation=self.obligation, pattern_result=self.pattern_result, chunk_text="Original DSGVO text", license_rule=1, ) assert control.license_rule == 1 assert control.source_original_text == "Original DSGVO text" assert control.customer_visible is True @pytest.mark.asyncio async def test_compose_rule2_stores_citation(self): """Rule 2: citation should be stored.""" citation = { "source": "OWASP ASVS", "license": "CC-BY-SA-4.0", "license_notice": "OWASP Foundation", } with patch( "compliance.services.control_composer._llm_ollama", new_callable=AsyncMock, return_value=_llm_success_response(), ): control = await self.composer.compose( obligation=self.obligation, pattern_result=self.pattern_result, chunk_text="OWASP text", license_rule=2, source_citation=citation, ) assert control.license_rule == 2 assert control.source_original_text == "OWASP text" assert control.source_citation == citation assert control.customer_visible is True @pytest.mark.asyncio async def test_compose_rule3_no_original(self): """Rule 3: no original text, not customer visible.""" with patch( "compliance.services.control_composer._llm_ollama", new_callable=AsyncMock, return_value=_llm_success_response(), ): control = await self.composer.compose( obligation=self.obligation, pattern_result=self.pattern_result, chunk_text="BSI restricted text", license_rule=3, ) assert control.license_rule == 3 assert control.source_original_text is None assert control.source_citation is None assert control.customer_visible is False # ============================================================================= # Tests: ControlComposer — Template-only fallback (LLM fails) # ============================================================================= class TestTemplateOnlyFallback: """Tests for template-only composition when LLM fails.""" def setup_method(self): self.composer = ControlComposer() self.obligation = _make_obligation() self.pattern_result = _make_pattern_result() @pytest.mark.asyncio async def test_template_fallback_on_empty_llm(self): """When LLM returns empty, should use template directly.""" with patch( "compliance.services.control_composer._llm_ollama", new_callable=AsyncMock, return_value="", ): control = await self.composer.compose( obligation=self.obligation, pattern_result=self.pattern_result, license_rule=1, ) assert control.composition_method == "template_only" assert "Compliance-Governance" in control.title assert control.severity == "high" # from pattern assert len(control.requirements) >= 2 # from pattern template @pytest.mark.asyncio async def test_template_fallback_on_invalid_json(self): """When LLM returns non-JSON, should use template.""" with patch( "compliance.services.control_composer._llm_ollama", new_callable=AsyncMock, return_value="This is not JSON at all", ): control = await self.composer.compose( obligation=self.obligation, pattern_result=self.pattern_result, license_rule=1, ) assert control.composition_method == "template_only" @pytest.mark.asyncio async def test_template_includes_obligation_title(self): """Template fallback should include obligation title in control title.""" with patch( "compliance.services.control_composer._llm_ollama", new_callable=AsyncMock, return_value="", ): control = await self.composer.compose( obligation=self.obligation, pattern_result=self.pattern_result, license_rule=1, ) assert "Verarbeitungsverzeichnis" in control.title @pytest.mark.asyncio async def test_template_has_open_anchors(self): """Template fallback should include pattern anchors.""" with patch( "compliance.services.control_composer._llm_ollama", new_callable=AsyncMock, return_value="", ): control = await self.composer.compose( obligation=self.obligation, pattern_result=self.pattern_result, license_rule=1, ) assert len(control.open_anchors) == 2 frameworks = [a["framework"] for a in control.open_anchors] assert "ISO 27001" in frameworks # ============================================================================= # Tests: ControlComposer — Fallback (no pattern) # ============================================================================= class TestFallbackNoPattern: """Tests for fallback composition without a pattern.""" def setup_method(self): self.composer = ControlComposer() self.obligation = _make_obligation() @pytest.mark.asyncio async def test_fallback_with_llm(self): """Fallback should work with LLM response.""" response = json.dumps({ "title": "Verarbeitungsverzeichnis", "objective": "Verzeichnis fuehren", "rationale": "DSGVO Art. 30", "requirements": ["VVT anlegen"], "test_procedure": ["VVT pruefen"], "evidence": ["VVT Dokument"], "severity": "high", }) with patch( "compliance.services.control_composer._llm_ollama", new_callable=AsyncMock, return_value=response, ): control = await self.composer.compose( obligation=self.obligation, pattern_result=PatternMatchResult(), # No pattern license_rule=1, ) assert control.composition_method == "fallback" assert control.pattern_id is None assert control.release_state == "needs_review" @pytest.mark.asyncio async def test_fallback_llm_fails(self): """Fallback with LLM failure should still produce a control.""" with patch( "compliance.services.control_composer._llm_ollama", new_callable=AsyncMock, return_value="", ): control = await self.composer.compose( obligation=self.obligation, pattern_result=PatternMatchResult(), license_rule=1, ) assert control.composition_method == "fallback" assert control.title != "" # Validation ensures minimum content assert len(control.requirements) >= 1 assert len(control.test_procedure) >= 1 @pytest.mark.asyncio async def test_fallback_no_obligation_text(self): """Fallback with empty obligation should still work.""" empty_obl = ObligationMatch() with patch( "compliance.services.control_composer._llm_ollama", new_callable=AsyncMock, return_value="", ): control = await self.composer.compose( obligation=empty_obl, pattern_result=PatternMatchResult(), license_rule=3, ) assert control.title != "" assert control.customer_visible is False # ============================================================================= # Tests: ControlComposer — Batch composition # ============================================================================= class TestComposeBatch: """Tests for batch composition.""" @pytest.mark.asyncio async def test_batch_returns_list(self): composer = ControlComposer() items = [ { "obligation": _make_obligation(), "pattern_result": _make_pattern_result(), "license_rule": 1, }, { "obligation": _make_obligation( obligation_id="NIS2-OBL-001", title="Incident Meldepflicht", regulation_id="nis2", ), "pattern_result": PatternMatchResult(), "license_rule": 3, }, ] with patch( "compliance.services.control_composer._llm_ollama", new_callable=AsyncMock, return_value=_llm_success_response(), ): results = await composer.compose_batch(items) assert len(results) == 2 assert results[0].pattern_id == "CP-COMP-001" assert results[1].pattern_id is None @pytest.mark.asyncio async def test_batch_empty(self): composer = ControlComposer() results = await composer.compose_batch([]) assert results == [] # ============================================================================= # Tests: Validation integration # ============================================================================= class TestValidationIntegration: """Tests that validation runs during compose.""" @pytest.mark.asyncio async def test_compose_validates_severity(self): """Invalid severity from LLM should be fixed.""" response = json.dumps({ "title": "Test", "objective": "Test", "severity": "EXTREME", }) composer = ControlComposer() with patch( "compliance.services.control_composer._llm_ollama", new_callable=AsyncMock, return_value=response, ): control = await composer.compose( obligation=_make_obligation(), pattern_result=_make_pattern_result(), license_rule=1, ) assert control.severity in {"low", "medium", "high", "critical"} @pytest.mark.asyncio async def test_compose_ensures_minimum_content(self): """Empty requirements from LLM should be filled with defaults.""" response = json.dumps({ "title": "Test", "objective": "Test objective", "requirements": [], }) composer = ControlComposer() with patch( "compliance.services.control_composer._llm_ollama", new_callable=AsyncMock, return_value=response, ): control = await composer.compose( obligation=_make_obligation(), pattern_result=_make_pattern_result(), license_rule=1, ) assert len(control.requirements) >= 1 # ============================================================================= # Tests: License rule edge cases # ============================================================================= class TestLicenseRuleEdgeCases: """Tests for license rule handling edge cases.""" @pytest.mark.asyncio async def test_rule1_no_chunk_text(self): """Rule 1 without chunk text: original_text should be None.""" composer = ControlComposer() with patch( "compliance.services.control_composer._llm_ollama", new_callable=AsyncMock, return_value=_llm_success_response(), ): control = await composer.compose( obligation=_make_obligation(), pattern_result=_make_pattern_result(), chunk_text=None, license_rule=1, ) assert control.license_rule == 1 assert control.source_original_text is None assert control.customer_visible is True @pytest.mark.asyncio async def test_rule2_no_citation(self): """Rule 2 without citation: citation should be None.""" composer = ControlComposer() with patch( "compliance.services.control_composer._llm_ollama", new_callable=AsyncMock, return_value=_llm_success_response(), ): control = await composer.compose( obligation=_make_obligation(), pattern_result=_make_pattern_result(), chunk_text="Some text", license_rule=2, source_citation=None, ) assert control.license_rule == 2 assert control.source_citation is None @pytest.mark.asyncio async def test_rule3_overrides_chunk_and_citation(self): """Rule 3 should always clear original text and citation.""" composer = ControlComposer() with patch( "compliance.services.control_composer._llm_ollama", new_callable=AsyncMock, return_value=_llm_success_response(), ): control = await composer.compose( obligation=_make_obligation(), pattern_result=_make_pattern_result(), chunk_text="This should be cleared", license_rule=3, source_citation={"source": "BSI"}, ) assert control.source_original_text is None assert control.source_citation is None assert control.customer_visible is False # ============================================================================= # Tests: Obligation without ID # ============================================================================= class TestObligationWithoutId: """Tests for handling obligations without a known ID.""" @pytest.mark.asyncio async def test_llm_extracted_obligation(self): """LLM-extracted obligation (no ID) should still compose.""" obl = ObligationMatch( obligation_id=None, obligation_title=None, obligation_text="Pflicht zur Meldung von Sicherheitsvorfaellen", method="llm_extracted", confidence=0.60, regulation_id="nis2", ) composer = ControlComposer() with patch( "compliance.services.control_composer._llm_ollama", new_callable=AsyncMock, return_value=_llm_success_response(), ): control = await composer.compose( obligation=obl, pattern_result=_make_pattern_result(), license_rule=1, ) assert control.obligation_ids == [] # No ID to link assert control.pattern_id == "CP-COMP-001" assert control.generation_metadata["obligation_method"] == "llm_extracted"