"""Tests for Control Generator Pipeline.""" import json import pytest from unittest.mock import AsyncMock, MagicMock, patch from compliance.services.control_generator import ( _classify_regulation, _detect_domain, _parse_llm_json, GeneratorConfig, GeneratedControl, ControlGeneratorPipeline, REGULATION_LICENSE_MAP, ) from compliance.services.anchor_finder import AnchorFinder, OpenAnchor from compliance.services.rag_client import RAGSearchResult # ============================================================================= # License Mapping Tests # ============================================================================= class TestLicenseMapping: """Tests for regulation_code → license rule classification.""" def test_rule1_eu_law(self): info = _classify_regulation("eu_2016_679") assert info["rule"] == 1 assert info["name"] == "DSGVO" def test_rule1_nist(self): info = _classify_regulation("nist_sp_800_53") assert info["rule"] == 1 assert "NIST" in info["name"] def test_rule1_german_law(self): info = _classify_regulation("bdsg") assert info["rule"] == 1 assert info["name"] == "BDSG" def test_rule2_owasp(self): info = _classify_regulation("owasp_asvs") assert info["rule"] == 2 assert "OWASP" in info["name"] assert "attribution" in info def test_rule2_enisa_prefix(self): info = _classify_regulation("enisa_iot_security") assert info["rule"] == 2 assert "ENISA" in info["name"] def test_rule3_bsi_prefix(self): info = _classify_regulation("bsi_tr03161") assert info["rule"] == 3 assert info["name"] == "INTERNAL_ONLY" def test_rule3_iso_prefix(self): info = _classify_regulation("iso_27001") assert info["rule"] == 3 def test_rule3_etsi_prefix(self): info = _classify_regulation("etsi_en_303_645") assert info["rule"] == 3 def test_unknown_defaults_to_rule3(self): info = _classify_regulation("some_unknown_source") assert info["rule"] == 3 def test_case_insensitive(self): info = _classify_regulation("EU_2016_679") assert info["rule"] == 1 def test_all_mapped_regulations_have_valid_rules(self): for code, info in REGULATION_LICENSE_MAP.items(): assert info["rule"] in (1, 2, 3), f"{code} has invalid rule {info['rule']}" def test_rule3_never_exposes_names(self): for prefix in ["bsi_test", "iso_test", "etsi_test"]: info = _classify_regulation(prefix) assert info["name"] == "INTERNAL_ONLY", f"{prefix} exposes name: {info['name']}" # ============================================================================= # Domain Detection Tests # ============================================================================= class TestDomainDetection: def test_auth_domain(self): assert _detect_domain("Multi-factor authentication and password policy") == "AUTH" def test_crypto_domain(self): assert _detect_domain("TLS 1.3 encryption and certificate management") == "CRYPT" def test_network_domain(self): assert _detect_domain("Firewall rules and network segmentation") == "NET" def test_data_domain(self): assert _detect_domain("DSGVO personenbezogene Daten Datenschutz") == "DATA" def test_default_domain(self): assert _detect_domain("random unrelated text xyz") == "SEC" # ============================================================================= # JSON Parsing Tests # ============================================================================= class TestJsonParsing: def test_parse_plain_json(self): result = _parse_llm_json('{"title": "Test", "objective": "Test obj"}') assert result["title"] == "Test" def test_parse_markdown_fenced_json(self): raw = '```json\n{"title": "Test"}\n```' result = _parse_llm_json(raw) assert result["title"] == "Test" def test_parse_json_with_preamble(self): raw = 'Here is the result:\n{"title": "Test"}' result = _parse_llm_json(raw) assert result["title"] == "Test" def test_parse_invalid_json(self): result = _parse_llm_json("not json at all") assert result == {} # ============================================================================= # GeneratedControl Rule Tests # ============================================================================= class TestGeneratedControlRules: """Tests that enforce the 3-rule licensing constraints.""" def test_rule1_has_original_text(self): ctrl = GeneratedControl(license_rule=1) ctrl.source_original_text = "Original EU law text" ctrl.source_citation = {"source": "DSGVO Art. 35", "license": "EU_LAW"} ctrl.customer_visible = True assert ctrl.source_original_text is not None assert ctrl.source_citation is not None assert ctrl.customer_visible is True def test_rule2_has_citation(self): ctrl = GeneratedControl(license_rule=2) ctrl.source_citation = {"source": "OWASP ASVS V2.1", "license": "CC-BY-SA-4.0"} ctrl.customer_visible = True assert ctrl.source_citation is not None assert "CC-BY-SA" in ctrl.source_citation["license"] def test_rule3_no_original_no_citation(self): ctrl = GeneratedControl(license_rule=3) ctrl.source_original_text = None ctrl.source_citation = None ctrl.customer_visible = False ctrl.generation_metadata = {"processing_path": "llm_reform", "license_rule": 3} assert ctrl.source_original_text is None assert ctrl.source_citation is None assert ctrl.customer_visible is False # generation_metadata must NOT contain source names metadata_str = json.dumps(ctrl.generation_metadata) assert "bsi" not in metadata_str.lower() assert "iso" not in metadata_str.lower() assert "TR-03161" not in metadata_str # ============================================================================= # Anchor Finder Tests # ============================================================================= class TestAnchorFinder: @pytest.mark.asyncio async def test_rag_anchor_search_filters_restricted(self): """Only Rule 1+2 sources are returned as anchors.""" mock_rag = AsyncMock() mock_rag.search.return_value = [ RAGSearchResult( text="OWASP requirement", regulation_code="owasp_asvs", regulation_name="OWASP ASVS", regulation_short="OWASP", category="requirement", article="V2.1.1", paragraph="", source_url="https://owasp.org", score=0.9, ), RAGSearchResult( text="BSI requirement", regulation_code="bsi_tr03161", regulation_name="BSI TR-03161", regulation_short="BSI", category="requirement", article="O.Auth_1", paragraph="", source_url="", score=0.85, ), ] finder = AnchorFinder(rag_client=mock_rag) control = GeneratedControl(title="Test Auth Control", tags=["auth"]) anchors = await finder.find_anchors(control, skip_web=True) # Only OWASP should be returned (Rule 2), BSI should be filtered out (Rule 3) assert len(anchors) == 1 assert anchors[0].framework == "OWASP ASVS" @pytest.mark.asyncio async def test_web_search_identifies_frameworks(self): finder = AnchorFinder() assert finder._identify_framework_from_url("https://owasp.org/asvs") == "OWASP" assert finder._identify_framework_from_url("https://csrc.nist.gov/sp800-53") == "NIST" assert finder._identify_framework_from_url("https://www.enisa.europa.eu/pub") == "ENISA" assert finder._identify_framework_from_url("https://random-site.com") is None # ============================================================================= # Pipeline Integration Tests (Mocked) # ============================================================================= class TestPipelineMocked: """Tests for the pipeline with mocked DB and external services.""" def _make_chunk(self, regulation_code: str = "owasp_asvs", article: str = "V2.1.1"): return RAGSearchResult( text="Applications must implement multi-factor authentication.", regulation_code=regulation_code, regulation_name="OWASP ASVS", regulation_short="OWASP", category="requirement", article=article, paragraph="", source_url="https://owasp.org", score=0.9, ) @pytest.mark.asyncio async def test_rule1_processing_path(self): """Rule 1 chunks produce controls with original text.""" chunk = self._make_chunk(regulation_code="eu_2016_679", article="Art. 35") chunk.text = "Die Datenschutz-Folgenabschaetzung ist durchzufuehren." chunk.regulation_name = "DSGVO" mock_db = MagicMock() mock_db.execute.return_value.fetchone.return_value = None pipeline = ControlGeneratorPipeline(db=mock_db) license_info = pipeline._classify_license(chunk) assert license_info["rule"] == 1 @pytest.mark.asyncio async def test_rule3_processing_blocks_source_info(self): """Rule 3 must never store original text or source names.""" mock_db = MagicMock() mock_rag = AsyncMock() pipeline = ControlGeneratorPipeline(db=mock_db, rag_client=mock_rag) # Simulate LLM response llm_response = json.dumps({ "title": "Secure Password Storage", "objective": "Passwords must be hashed with modern algorithms.", "rationale": "Prevents credential theft.", "requirements": ["Use bcrypt or argon2"], "test_procedure": ["Verify hash algorithm"], "evidence": ["Config review"], "severity": "high", "tags": ["auth", "password"], }) with patch("compliance.services.control_generator._llm_chat", return_value=llm_response): chunk = self._make_chunk(regulation_code="bsi_tr03161", article="O.Auth_1") config = GeneratorConfig(max_controls=1) control = await pipeline._llm_reformulate(chunk, config) assert control.license_rule == 3 assert control.source_original_text is None assert control.source_citation is None assert control.customer_visible is False # Verify no BSI references in metadata metadata_str = json.dumps(control.generation_metadata) assert "bsi" not in metadata_str.lower() assert "BSI" not in metadata_str assert "TR-03161" not in metadata_str @pytest.mark.asyncio async def test_chunk_hash_deduplication(self): """Same chunk text produces same hash — no double processing.""" import hashlib text = "Test requirement text" h1 = hashlib.sha256(text.encode()).hexdigest() h2 = hashlib.sha256(text.encode()).hexdigest() assert h1 == h2 def test_config_defaults(self): config = GeneratorConfig() assert config.max_controls == 50 assert config.batch_size == 5 assert config.skip_processed is True assert config.dry_run is False @pytest.mark.asyncio async def test_structure_free_use_produces_citation(self): """Rule 1 structuring includes source citation.""" mock_db = MagicMock() pipeline = ControlGeneratorPipeline(db=mock_db) llm_response = json.dumps({ "title": "DSFA Pflicht", "objective": "DSFA bei hohem Risiko durchfuehren.", "rationale": "Gesetzliche Pflicht nach DSGVO.", "requirements": ["DSFA durchfuehren"], "test_procedure": ["DSFA Bericht pruefen"], "evidence": ["DSFA Dokumentation"], "severity": "high", "tags": ["dsfa", "dsgvo"], }) chunk = self._make_chunk(regulation_code="eu_2016_679", article="Art. 35") chunk.text = "Art. 35 DSGVO: Datenschutz-Folgenabschaetzung" chunk.regulation_name = "DSGVO" license_info = _classify_regulation("eu_2016_679") with patch("compliance.services.control_generator._llm_chat", return_value=llm_response): control = await pipeline._structure_free_use(chunk, license_info) assert control.license_rule == 1 assert control.source_original_text is not None assert control.source_citation is not None assert "DSGVO" in control.source_citation["source"] assert control.customer_visible is True