Files
breakpilot-compliance/backend-compliance/tests/test_control_generator.py
Benjamin Admin de19ef0684
Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 45s
CI/CD / test-python-document-crawler (push) Has been cancelled
CI/CD / test-python-dsms-gateway (push) Has been cancelled
CI/CD / validate-canonical-controls (push) Has been cancelled
CI/CD / deploy-hetzner (push) Has been cancelled
CI/CD / test-python-backend-compliance (push) Has been cancelled
feat(control-generator): 7-stage pipeline for RAG→LLM→Controls generation
Implements the Control Generator Pipeline that systematically generates
canonical security controls from 150k+ RAG chunks across all compliance
collections (BSI, NIST, OWASP, ENISA, EU laws, German laws).

Three license rules enforced throughout:
- Rule 1 (free_use): Laws/Public Domain — original text preserved
- Rule 2 (citation_required): CC-BY/CC-BY-SA — text with citation
- Rule 3 (restricted): BSI/ISO — full reformulation, no source traces

New files:
- Migration 046: job tracking, chunk tracking, blocked sources tables
- control_generator.py: 7-stage pipeline (scan→classify→structure/reform→harmonize→anchor→store→mark)
- anchor_finder.py: RAG + DuckDuckGo open-source reference search
- control_generator_routes.py: REST API (generate, review, stats, blocked-sources)
- test_control_generator.py: license mapping, rule enforcement, anchor filtering tests

Modified:
- __init__.py: register control_generator_router
- route.ts: proxy generator/review/stats endpoints
- page.tsx: Generator modal, stats panel, state filter, review queue, license badges

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 09:03:37 +01:00

343 lines
13 KiB
Python

"""Tests for Control Generator Pipeline."""
import json
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from compliance.services.control_generator import (
_classify_regulation,
_detect_domain,
_parse_llm_json,
GeneratorConfig,
GeneratedControl,
ControlGeneratorPipeline,
REGULATION_LICENSE_MAP,
)
from compliance.services.anchor_finder import AnchorFinder, OpenAnchor
from compliance.services.rag_client import RAGSearchResult
# =============================================================================
# License Mapping Tests
# =============================================================================
class TestLicenseMapping:
"""Tests for regulation_code → license rule classification."""
def test_rule1_eu_law(self):
info = _classify_regulation("eu_2016_679")
assert info["rule"] == 1
assert info["name"] == "DSGVO"
def test_rule1_nist(self):
info = _classify_regulation("nist_sp_800_53")
assert info["rule"] == 1
assert "NIST" in info["name"]
def test_rule1_german_law(self):
info = _classify_regulation("bdsg")
assert info["rule"] == 1
assert info["name"] == "BDSG"
def test_rule2_owasp(self):
info = _classify_regulation("owasp_asvs")
assert info["rule"] == 2
assert "OWASP" in info["name"]
assert "attribution" in info
def test_rule2_enisa_prefix(self):
info = _classify_regulation("enisa_iot_security")
assert info["rule"] == 2
assert "ENISA" in info["name"]
def test_rule3_bsi_prefix(self):
info = _classify_regulation("bsi_tr03161")
assert info["rule"] == 3
assert info["name"] == "INTERNAL_ONLY"
def test_rule3_iso_prefix(self):
info = _classify_regulation("iso_27001")
assert info["rule"] == 3
def test_rule3_etsi_prefix(self):
info = _classify_regulation("etsi_en_303_645")
assert info["rule"] == 3
def test_unknown_defaults_to_rule3(self):
info = _classify_regulation("some_unknown_source")
assert info["rule"] == 3
def test_case_insensitive(self):
info = _classify_regulation("EU_2016_679")
assert info["rule"] == 1
def test_all_mapped_regulations_have_valid_rules(self):
for code, info in REGULATION_LICENSE_MAP.items():
assert info["rule"] in (1, 2, 3), f"{code} has invalid rule {info['rule']}"
def test_rule3_never_exposes_names(self):
for prefix in ["bsi_test", "iso_test", "etsi_test"]:
info = _classify_regulation(prefix)
assert info["name"] == "INTERNAL_ONLY", f"{prefix} exposes name: {info['name']}"
# =============================================================================
# Domain Detection Tests
# =============================================================================
class TestDomainDetection:
def test_auth_domain(self):
assert _detect_domain("Multi-factor authentication and password policy") == "AUTH"
def test_crypto_domain(self):
assert _detect_domain("TLS 1.3 encryption and certificate management") == "CRYPT"
def test_network_domain(self):
assert _detect_domain("Firewall rules and network segmentation") == "NET"
def test_data_domain(self):
assert _detect_domain("DSGVO personenbezogene Daten Datenschutz") == "DATA"
def test_default_domain(self):
assert _detect_domain("random unrelated text xyz") == "SEC"
# =============================================================================
# JSON Parsing Tests
# =============================================================================
class TestJsonParsing:
def test_parse_plain_json(self):
result = _parse_llm_json('{"title": "Test", "objective": "Test obj"}')
assert result["title"] == "Test"
def test_parse_markdown_fenced_json(self):
raw = '```json\n{"title": "Test"}\n```'
result = _parse_llm_json(raw)
assert result["title"] == "Test"
def test_parse_json_with_preamble(self):
raw = 'Here is the result:\n{"title": "Test"}'
result = _parse_llm_json(raw)
assert result["title"] == "Test"
def test_parse_invalid_json(self):
result = _parse_llm_json("not json at all")
assert result == {}
# =============================================================================
# GeneratedControl Rule Tests
# =============================================================================
class TestGeneratedControlRules:
"""Tests that enforce the 3-rule licensing constraints."""
def test_rule1_has_original_text(self):
ctrl = GeneratedControl(license_rule=1)
ctrl.source_original_text = "Original EU law text"
ctrl.source_citation = {"source": "DSGVO Art. 35", "license": "EU_LAW"}
ctrl.customer_visible = True
assert ctrl.source_original_text is not None
assert ctrl.source_citation is not None
assert ctrl.customer_visible is True
def test_rule2_has_citation(self):
ctrl = GeneratedControl(license_rule=2)
ctrl.source_citation = {"source": "OWASP ASVS V2.1", "license": "CC-BY-SA-4.0"}
ctrl.customer_visible = True
assert ctrl.source_citation is not None
assert "CC-BY-SA" in ctrl.source_citation["license"]
def test_rule3_no_original_no_citation(self):
ctrl = GeneratedControl(license_rule=3)
ctrl.source_original_text = None
ctrl.source_citation = None
ctrl.customer_visible = False
ctrl.generation_metadata = {"processing_path": "llm_reform", "license_rule": 3}
assert ctrl.source_original_text is None
assert ctrl.source_citation is None
assert ctrl.customer_visible is False
# generation_metadata must NOT contain source names
metadata_str = json.dumps(ctrl.generation_metadata)
assert "bsi" not in metadata_str.lower()
assert "iso" not in metadata_str.lower()
assert "TR-03161" not in metadata_str
# =============================================================================
# Anchor Finder Tests
# =============================================================================
class TestAnchorFinder:
@pytest.mark.asyncio
async def test_rag_anchor_search_filters_restricted(self):
"""Only Rule 1+2 sources are returned as anchors."""
mock_rag = AsyncMock()
mock_rag.search.return_value = [
RAGSearchResult(
text="OWASP requirement",
regulation_code="owasp_asvs",
regulation_name="OWASP ASVS",
regulation_short="OWASP",
category="requirement",
article="V2.1.1",
paragraph="",
source_url="https://owasp.org",
score=0.9,
),
RAGSearchResult(
text="BSI requirement",
regulation_code="bsi_tr03161",
regulation_name="BSI TR-03161",
regulation_short="BSI",
category="requirement",
article="O.Auth_1",
paragraph="",
source_url="",
score=0.85,
),
]
finder = AnchorFinder(rag_client=mock_rag)
control = GeneratedControl(title="Test Auth Control", tags=["auth"])
anchors = await finder.find_anchors(control, skip_web=True)
# Only OWASP should be returned (Rule 2), BSI should be filtered out (Rule 3)
assert len(anchors) == 1
assert anchors[0].framework == "OWASP ASVS"
@pytest.mark.asyncio
async def test_web_search_identifies_frameworks(self):
finder = AnchorFinder()
assert finder._identify_framework_from_url("https://owasp.org/asvs") == "OWASP"
assert finder._identify_framework_from_url("https://csrc.nist.gov/sp800-53") == "NIST"
assert finder._identify_framework_from_url("https://www.enisa.europa.eu/pub") == "ENISA"
assert finder._identify_framework_from_url("https://random-site.com") is None
# =============================================================================
# Pipeline Integration Tests (Mocked)
# =============================================================================
class TestPipelineMocked:
"""Tests for the pipeline with mocked DB and external services."""
def _make_chunk(self, regulation_code: str = "owasp_asvs", article: str = "V2.1.1"):
return RAGSearchResult(
text="Applications must implement multi-factor authentication.",
regulation_code=regulation_code,
regulation_name="OWASP ASVS",
regulation_short="OWASP",
category="requirement",
article=article,
paragraph="",
source_url="https://owasp.org",
score=0.9,
)
@pytest.mark.asyncio
async def test_rule1_processing_path(self):
"""Rule 1 chunks produce controls with original text."""
chunk = self._make_chunk(regulation_code="eu_2016_679", article="Art. 35")
chunk.text = "Die Datenschutz-Folgenabschaetzung ist durchzufuehren."
chunk.regulation_name = "DSGVO"
mock_db = MagicMock()
mock_db.execute.return_value.fetchone.return_value = None
pipeline = ControlGeneratorPipeline(db=mock_db)
license_info = pipeline._classify_license(chunk)
assert license_info["rule"] == 1
@pytest.mark.asyncio
async def test_rule3_processing_blocks_source_info(self):
"""Rule 3 must never store original text or source names."""
mock_db = MagicMock()
mock_rag = AsyncMock()
pipeline = ControlGeneratorPipeline(db=mock_db, rag_client=mock_rag)
# Simulate LLM response
llm_response = json.dumps({
"title": "Secure Password Storage",
"objective": "Passwords must be hashed with modern algorithms.",
"rationale": "Prevents credential theft.",
"requirements": ["Use bcrypt or argon2"],
"test_procedure": ["Verify hash algorithm"],
"evidence": ["Config review"],
"severity": "high",
"tags": ["auth", "password"],
})
with patch("compliance.services.control_generator._llm_chat", return_value=llm_response):
chunk = self._make_chunk(regulation_code="bsi_tr03161", article="O.Auth_1")
config = GeneratorConfig(max_controls=1)
control = await pipeline._llm_reformulate(chunk, config)
assert control.license_rule == 3
assert control.source_original_text is None
assert control.source_citation is None
assert control.customer_visible is False
# Verify no BSI references in metadata
metadata_str = json.dumps(control.generation_metadata)
assert "bsi" not in metadata_str.lower()
assert "BSI" not in metadata_str
assert "TR-03161" not in metadata_str
@pytest.mark.asyncio
async def test_chunk_hash_deduplication(self):
"""Same chunk text produces same hash — no double processing."""
import hashlib
text = "Test requirement text"
h1 = hashlib.sha256(text.encode()).hexdigest()
h2 = hashlib.sha256(text.encode()).hexdigest()
assert h1 == h2
def test_config_defaults(self):
config = GeneratorConfig()
assert config.max_controls == 50
assert config.batch_size == 5
assert config.skip_processed is True
assert config.dry_run is False
@pytest.mark.asyncio
async def test_structure_free_use_produces_citation(self):
"""Rule 1 structuring includes source citation."""
mock_db = MagicMock()
pipeline = ControlGeneratorPipeline(db=mock_db)
llm_response = json.dumps({
"title": "DSFA Pflicht",
"objective": "DSFA bei hohem Risiko durchfuehren.",
"rationale": "Gesetzliche Pflicht nach DSGVO.",
"requirements": ["DSFA durchfuehren"],
"test_procedure": ["DSFA Bericht pruefen"],
"evidence": ["DSFA Dokumentation"],
"severity": "high",
"tags": ["dsfa", "dsgvo"],
})
chunk = self._make_chunk(regulation_code="eu_2016_679", article="Art. 35")
chunk.text = "Art. 35 DSGVO: Datenschutz-Folgenabschaetzung"
chunk.regulation_name = "DSGVO"
license_info = _classify_regulation("eu_2016_679")
with patch("compliance.services.control_generator._llm_chat", return_value=llm_response):
control = await pipeline._structure_free_use(chunk, license_info)
assert control.license_rule == 1
assert control.source_original_text is not None
assert control.source_citation is not None
assert "DSGVO" in control.source_citation["source"]
assert control.customer_visible is True