Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 45s
CI/CD / test-python-document-crawler (push) Has been cancelled
CI/CD / test-python-dsms-gateway (push) Has been cancelled
CI/CD / validate-canonical-controls (push) Has been cancelled
CI/CD / deploy-hetzner (push) Has been cancelled
CI/CD / test-python-backend-compliance (push) Has been cancelled
Implements the Control Generator Pipeline that systematically generates canonical security controls from 150k+ RAG chunks across all compliance collections (BSI, NIST, OWASP, ENISA, EU laws, German laws). Three license rules enforced throughout: - Rule 1 (free_use): Laws/Public Domain — original text preserved - Rule 2 (citation_required): CC-BY/CC-BY-SA — text with citation - Rule 3 (restricted): BSI/ISO — full reformulation, no source traces New files: - Migration 046: job tracking, chunk tracking, blocked sources tables - control_generator.py: 7-stage pipeline (scan→classify→structure/reform→harmonize→anchor→store→mark) - anchor_finder.py: RAG + DuckDuckGo open-source reference search - control_generator_routes.py: REST API (generate, review, stats, blocked-sources) - test_control_generator.py: license mapping, rule enforcement, anchor filtering tests Modified: - __init__.py: register control_generator_router - route.ts: proxy generator/review/stats endpoints - page.tsx: Generator modal, stats panel, state filter, review queue, license badges Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
343 lines
13 KiB
Python
343 lines
13 KiB
Python
"""Tests for Control Generator Pipeline."""
|
|
|
|
import json
|
|
import pytest
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
from compliance.services.control_generator import (
|
|
_classify_regulation,
|
|
_detect_domain,
|
|
_parse_llm_json,
|
|
GeneratorConfig,
|
|
GeneratedControl,
|
|
ControlGeneratorPipeline,
|
|
REGULATION_LICENSE_MAP,
|
|
)
|
|
from compliance.services.anchor_finder import AnchorFinder, OpenAnchor
|
|
from compliance.services.rag_client import RAGSearchResult
|
|
|
|
|
|
# =============================================================================
|
|
# License Mapping Tests
|
|
# =============================================================================
|
|
|
|
class TestLicenseMapping:
|
|
"""Tests for regulation_code → license rule classification."""
|
|
|
|
def test_rule1_eu_law(self):
|
|
info = _classify_regulation("eu_2016_679")
|
|
assert info["rule"] == 1
|
|
assert info["name"] == "DSGVO"
|
|
|
|
def test_rule1_nist(self):
|
|
info = _classify_regulation("nist_sp_800_53")
|
|
assert info["rule"] == 1
|
|
assert "NIST" in info["name"]
|
|
|
|
def test_rule1_german_law(self):
|
|
info = _classify_regulation("bdsg")
|
|
assert info["rule"] == 1
|
|
assert info["name"] == "BDSG"
|
|
|
|
def test_rule2_owasp(self):
|
|
info = _classify_regulation("owasp_asvs")
|
|
assert info["rule"] == 2
|
|
assert "OWASP" in info["name"]
|
|
assert "attribution" in info
|
|
|
|
def test_rule2_enisa_prefix(self):
|
|
info = _classify_regulation("enisa_iot_security")
|
|
assert info["rule"] == 2
|
|
assert "ENISA" in info["name"]
|
|
|
|
def test_rule3_bsi_prefix(self):
|
|
info = _classify_regulation("bsi_tr03161")
|
|
assert info["rule"] == 3
|
|
assert info["name"] == "INTERNAL_ONLY"
|
|
|
|
def test_rule3_iso_prefix(self):
|
|
info = _classify_regulation("iso_27001")
|
|
assert info["rule"] == 3
|
|
|
|
def test_rule3_etsi_prefix(self):
|
|
info = _classify_regulation("etsi_en_303_645")
|
|
assert info["rule"] == 3
|
|
|
|
def test_unknown_defaults_to_rule3(self):
|
|
info = _classify_regulation("some_unknown_source")
|
|
assert info["rule"] == 3
|
|
|
|
def test_case_insensitive(self):
|
|
info = _classify_regulation("EU_2016_679")
|
|
assert info["rule"] == 1
|
|
|
|
def test_all_mapped_regulations_have_valid_rules(self):
|
|
for code, info in REGULATION_LICENSE_MAP.items():
|
|
assert info["rule"] in (1, 2, 3), f"{code} has invalid rule {info['rule']}"
|
|
|
|
def test_rule3_never_exposes_names(self):
|
|
for prefix in ["bsi_test", "iso_test", "etsi_test"]:
|
|
info = _classify_regulation(prefix)
|
|
assert info["name"] == "INTERNAL_ONLY", f"{prefix} exposes name: {info['name']}"
|
|
|
|
|
|
# =============================================================================
|
|
# Domain Detection Tests
|
|
# =============================================================================
|
|
|
|
class TestDomainDetection:
|
|
|
|
def test_auth_domain(self):
|
|
assert _detect_domain("Multi-factor authentication and password policy") == "AUTH"
|
|
|
|
def test_crypto_domain(self):
|
|
assert _detect_domain("TLS 1.3 encryption and certificate management") == "CRYPT"
|
|
|
|
def test_network_domain(self):
|
|
assert _detect_domain("Firewall rules and network segmentation") == "NET"
|
|
|
|
def test_data_domain(self):
|
|
assert _detect_domain("DSGVO personenbezogene Daten Datenschutz") == "DATA"
|
|
|
|
def test_default_domain(self):
|
|
assert _detect_domain("random unrelated text xyz") == "SEC"
|
|
|
|
|
|
# =============================================================================
|
|
# JSON Parsing Tests
|
|
# =============================================================================
|
|
|
|
class TestJsonParsing:
|
|
|
|
def test_parse_plain_json(self):
|
|
result = _parse_llm_json('{"title": "Test", "objective": "Test obj"}')
|
|
assert result["title"] == "Test"
|
|
|
|
def test_parse_markdown_fenced_json(self):
|
|
raw = '```json\n{"title": "Test"}\n```'
|
|
result = _parse_llm_json(raw)
|
|
assert result["title"] == "Test"
|
|
|
|
def test_parse_json_with_preamble(self):
|
|
raw = 'Here is the result:\n{"title": "Test"}'
|
|
result = _parse_llm_json(raw)
|
|
assert result["title"] == "Test"
|
|
|
|
def test_parse_invalid_json(self):
|
|
result = _parse_llm_json("not json at all")
|
|
assert result == {}
|
|
|
|
|
|
# =============================================================================
|
|
# GeneratedControl Rule Tests
|
|
# =============================================================================
|
|
|
|
class TestGeneratedControlRules:
|
|
"""Tests that enforce the 3-rule licensing constraints."""
|
|
|
|
def test_rule1_has_original_text(self):
|
|
ctrl = GeneratedControl(license_rule=1)
|
|
ctrl.source_original_text = "Original EU law text"
|
|
ctrl.source_citation = {"source": "DSGVO Art. 35", "license": "EU_LAW"}
|
|
ctrl.customer_visible = True
|
|
|
|
assert ctrl.source_original_text is not None
|
|
assert ctrl.source_citation is not None
|
|
assert ctrl.customer_visible is True
|
|
|
|
def test_rule2_has_citation(self):
|
|
ctrl = GeneratedControl(license_rule=2)
|
|
ctrl.source_citation = {"source": "OWASP ASVS V2.1", "license": "CC-BY-SA-4.0"}
|
|
ctrl.customer_visible = True
|
|
|
|
assert ctrl.source_citation is not None
|
|
assert "CC-BY-SA" in ctrl.source_citation["license"]
|
|
|
|
def test_rule3_no_original_no_citation(self):
|
|
ctrl = GeneratedControl(license_rule=3)
|
|
ctrl.source_original_text = None
|
|
ctrl.source_citation = None
|
|
ctrl.customer_visible = False
|
|
ctrl.generation_metadata = {"processing_path": "llm_reform", "license_rule": 3}
|
|
|
|
assert ctrl.source_original_text is None
|
|
assert ctrl.source_citation is None
|
|
assert ctrl.customer_visible is False
|
|
# generation_metadata must NOT contain source names
|
|
metadata_str = json.dumps(ctrl.generation_metadata)
|
|
assert "bsi" not in metadata_str.lower()
|
|
assert "iso" not in metadata_str.lower()
|
|
assert "TR-03161" not in metadata_str
|
|
|
|
|
|
# =============================================================================
|
|
# Anchor Finder Tests
|
|
# =============================================================================
|
|
|
|
class TestAnchorFinder:
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rag_anchor_search_filters_restricted(self):
|
|
"""Only Rule 1+2 sources are returned as anchors."""
|
|
mock_rag = AsyncMock()
|
|
mock_rag.search.return_value = [
|
|
RAGSearchResult(
|
|
text="OWASP requirement",
|
|
regulation_code="owasp_asvs",
|
|
regulation_name="OWASP ASVS",
|
|
regulation_short="OWASP",
|
|
category="requirement",
|
|
article="V2.1.1",
|
|
paragraph="",
|
|
source_url="https://owasp.org",
|
|
score=0.9,
|
|
),
|
|
RAGSearchResult(
|
|
text="BSI requirement",
|
|
regulation_code="bsi_tr03161",
|
|
regulation_name="BSI TR-03161",
|
|
regulation_short="BSI",
|
|
category="requirement",
|
|
article="O.Auth_1",
|
|
paragraph="",
|
|
source_url="",
|
|
score=0.85,
|
|
),
|
|
]
|
|
|
|
finder = AnchorFinder(rag_client=mock_rag)
|
|
control = GeneratedControl(title="Test Auth Control", tags=["auth"])
|
|
|
|
anchors = await finder.find_anchors(control, skip_web=True)
|
|
|
|
# Only OWASP should be returned (Rule 2), BSI should be filtered out (Rule 3)
|
|
assert len(anchors) == 1
|
|
assert anchors[0].framework == "OWASP ASVS"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_search_identifies_frameworks(self):
|
|
finder = AnchorFinder()
|
|
|
|
assert finder._identify_framework_from_url("https://owasp.org/asvs") == "OWASP"
|
|
assert finder._identify_framework_from_url("https://csrc.nist.gov/sp800-53") == "NIST"
|
|
assert finder._identify_framework_from_url("https://www.enisa.europa.eu/pub") == "ENISA"
|
|
assert finder._identify_framework_from_url("https://random-site.com") is None
|
|
|
|
|
|
# =============================================================================
|
|
# Pipeline Integration Tests (Mocked)
|
|
# =============================================================================
|
|
|
|
class TestPipelineMocked:
|
|
"""Tests for the pipeline with mocked DB and external services."""
|
|
|
|
def _make_chunk(self, regulation_code: str = "owasp_asvs", article: str = "V2.1.1"):
|
|
return RAGSearchResult(
|
|
text="Applications must implement multi-factor authentication.",
|
|
regulation_code=regulation_code,
|
|
regulation_name="OWASP ASVS",
|
|
regulation_short="OWASP",
|
|
category="requirement",
|
|
article=article,
|
|
paragraph="",
|
|
source_url="https://owasp.org",
|
|
score=0.9,
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rule1_processing_path(self):
|
|
"""Rule 1 chunks produce controls with original text."""
|
|
chunk = self._make_chunk(regulation_code="eu_2016_679", article="Art. 35")
|
|
chunk.text = "Die Datenschutz-Folgenabschaetzung ist durchzufuehren."
|
|
chunk.regulation_name = "DSGVO"
|
|
|
|
mock_db = MagicMock()
|
|
mock_db.execute.return_value.fetchone.return_value = None
|
|
|
|
pipeline = ControlGeneratorPipeline(db=mock_db)
|
|
license_info = pipeline._classify_license(chunk)
|
|
|
|
assert license_info["rule"] == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rule3_processing_blocks_source_info(self):
|
|
"""Rule 3 must never store original text or source names."""
|
|
mock_db = MagicMock()
|
|
mock_rag = AsyncMock()
|
|
|
|
pipeline = ControlGeneratorPipeline(db=mock_db, rag_client=mock_rag)
|
|
|
|
# Simulate LLM response
|
|
llm_response = json.dumps({
|
|
"title": "Secure Password Storage",
|
|
"objective": "Passwords must be hashed with modern algorithms.",
|
|
"rationale": "Prevents credential theft.",
|
|
"requirements": ["Use bcrypt or argon2"],
|
|
"test_procedure": ["Verify hash algorithm"],
|
|
"evidence": ["Config review"],
|
|
"severity": "high",
|
|
"tags": ["auth", "password"],
|
|
})
|
|
|
|
with patch("compliance.services.control_generator._llm_chat", return_value=llm_response):
|
|
chunk = self._make_chunk(regulation_code="bsi_tr03161", article="O.Auth_1")
|
|
config = GeneratorConfig(max_controls=1)
|
|
control = await pipeline._llm_reformulate(chunk, config)
|
|
|
|
assert control.license_rule == 3
|
|
assert control.source_original_text is None
|
|
assert control.source_citation is None
|
|
assert control.customer_visible is False
|
|
# Verify no BSI references in metadata
|
|
metadata_str = json.dumps(control.generation_metadata)
|
|
assert "bsi" not in metadata_str.lower()
|
|
assert "BSI" not in metadata_str
|
|
assert "TR-03161" not in metadata_str
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_chunk_hash_deduplication(self):
|
|
"""Same chunk text produces same hash — no double processing."""
|
|
import hashlib
|
|
text = "Test requirement text"
|
|
h1 = hashlib.sha256(text.encode()).hexdigest()
|
|
h2 = hashlib.sha256(text.encode()).hexdigest()
|
|
assert h1 == h2
|
|
|
|
def test_config_defaults(self):
|
|
config = GeneratorConfig()
|
|
assert config.max_controls == 50
|
|
assert config.batch_size == 5
|
|
assert config.skip_processed is True
|
|
assert config.dry_run is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_structure_free_use_produces_citation(self):
|
|
"""Rule 1 structuring includes source citation."""
|
|
mock_db = MagicMock()
|
|
pipeline = ControlGeneratorPipeline(db=mock_db)
|
|
|
|
llm_response = json.dumps({
|
|
"title": "DSFA Pflicht",
|
|
"objective": "DSFA bei hohem Risiko durchfuehren.",
|
|
"rationale": "Gesetzliche Pflicht nach DSGVO.",
|
|
"requirements": ["DSFA durchfuehren"],
|
|
"test_procedure": ["DSFA Bericht pruefen"],
|
|
"evidence": ["DSFA Dokumentation"],
|
|
"severity": "high",
|
|
"tags": ["dsfa", "dsgvo"],
|
|
})
|
|
|
|
chunk = self._make_chunk(regulation_code="eu_2016_679", article="Art. 35")
|
|
chunk.text = "Art. 35 DSGVO: Datenschutz-Folgenabschaetzung"
|
|
chunk.regulation_name = "DSGVO"
|
|
license_info = _classify_regulation("eu_2016_679")
|
|
|
|
with patch("compliance.services.control_generator._llm_chat", return_value=llm_response):
|
|
control = await pipeline._structure_free_use(chunk, license_info)
|
|
|
|
assert control.license_rule == 1
|
|
assert control.source_original_text is not None
|
|
assert control.source_citation is not None
|
|
assert "DSGVO" in control.source_citation["source"]
|
|
assert control.customer_visible is True
|