feat: enhance legal basis display, add batch processing tests and docs
All checks were successful
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 32s
CI/CD / test-python-backend-compliance (push) Successful in 31s
CI/CD / test-python-document-crawler (push) Successful in 23s
CI/CD / test-python-dsms-gateway (push) Successful in 17s
CI/CD / validate-canonical-controls (push) Successful in 12s
CI/CD / Deploy (push) Successful in 2s
All checks were successful
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 32s
CI/CD / test-python-backend-compliance (push) Successful in 31s
CI/CD / test-python-document-crawler (push) Successful in 23s
CI/CD / test-python-dsms-gateway (push) Successful in 17s
CI/CD / validate-canonical-controls (push) Successful in 12s
CI/CD / Deploy (push) Successful in 2s
- Backfill 81 controls with empty source_citation.source from generation_metadata - Add fallback to generation_metadata.source_regulation in ControlDetail blue box - Improve Rule 3 amber box text for reformulated controls - Add 30 new tests for batch processing (TestParseJsonArray, TestBatchSizeConfig, TestBatchProcessingLoop) — all 61 control generator tests passing - Fix stale test_config_defaults assertion (max_controls 50→0) - Update canonical-control-library.md with batch processing pipeline docs, processed chunks tracking, migration guide, and stats endpoint - Update testing.md with canonical control generator test section Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -305,7 +305,7 @@ class TestPipelineMocked:
|
||||
|
||||
def test_config_defaults(self):
|
||||
config = GeneratorConfig()
|
||||
assert config.max_controls == 50
|
||||
assert config.max_controls == 0
|
||||
assert config.batch_size == 5
|
||||
assert config.skip_processed is True
|
||||
assert config.dry_run is False
|
||||
@@ -340,3 +340,610 @@ class TestPipelineMocked:
|
||||
assert control.source_citation is not None
|
||||
assert "DSGVO" in control.source_citation["source"]
|
||||
assert control.customer_visible is True
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# JSON Array Parsing Tests (_parse_llm_json_array)
|
||||
# =============================================================================
|
||||
|
||||
from compliance.services.control_generator import _parse_llm_json_array
|
||||
|
||||
|
||||
class TestParseJsonArray:
|
||||
"""Tests for _parse_llm_json_array — batch LLM response parsing."""
|
||||
|
||||
def test_clean_json_array(self):
|
||||
"""A well-formed JSON array should be returned directly."""
|
||||
raw = json.dumps([
|
||||
{"title": "Control A", "objective": "Obj A"},
|
||||
{"title": "Control B", "objective": "Obj B"},
|
||||
])
|
||||
result = _parse_llm_json_array(raw)
|
||||
assert len(result) == 2
|
||||
assert result[0]["title"] == "Control A"
|
||||
assert result[1]["title"] == "Control B"
|
||||
|
||||
def test_json_array_in_markdown_code_block(self):
|
||||
"""JSON array wrapped in ```json ... ``` markdown fence."""
|
||||
inner = json.dumps([
|
||||
{"title": "Fenced A", "chunk_index": 1},
|
||||
{"title": "Fenced B", "chunk_index": 2},
|
||||
])
|
||||
raw = f"Here is the result:\n```json\n{inner}\n```\nDone."
|
||||
result = _parse_llm_json_array(raw)
|
||||
assert len(result) == 2
|
||||
assert result[0]["title"] == "Fenced A"
|
||||
assert result[1]["title"] == "Fenced B"
|
||||
|
||||
def test_markdown_code_block_without_json_tag(self):
|
||||
"""Markdown fence without explicit 'json' language tag."""
|
||||
inner = json.dumps([{"title": "NoTag", "objective": "test"}])
|
||||
raw = f"```\n{inner}\n```"
|
||||
result = _parse_llm_json_array(raw)
|
||||
assert len(result) == 1
|
||||
assert result[0]["title"] == "NoTag"
|
||||
|
||||
def test_wrapper_object_controls_key(self):
|
||||
"""LLM wraps array in {"controls": [...]} — should unwrap."""
|
||||
raw = json.dumps({
|
||||
"controls": [
|
||||
{"title": "Wrapped A", "objective": "O1"},
|
||||
{"title": "Wrapped B", "objective": "O2"},
|
||||
]
|
||||
})
|
||||
result = _parse_llm_json_array(raw)
|
||||
assert len(result) == 2
|
||||
assert result[0]["title"] == "Wrapped A"
|
||||
|
||||
def test_wrapper_object_results_key(self):
|
||||
"""LLM wraps array in {"results": [...]} — should unwrap."""
|
||||
raw = json.dumps({
|
||||
"results": [
|
||||
{"title": "R1"},
|
||||
{"title": "R2"},
|
||||
{"title": "R3"},
|
||||
]
|
||||
})
|
||||
result = _parse_llm_json_array(raw)
|
||||
assert len(result) == 3
|
||||
|
||||
def test_wrapper_object_items_key(self):
|
||||
"""LLM wraps array in {"items": [...]} — should unwrap."""
|
||||
raw = json.dumps({
|
||||
"items": [{"title": "Item1"}]
|
||||
})
|
||||
result = _parse_llm_json_array(raw)
|
||||
assert len(result) == 1
|
||||
assert result[0]["title"] == "Item1"
|
||||
|
||||
def test_wrapper_object_data_key(self):
|
||||
"""LLM wraps array in {"data": [...]} — should unwrap."""
|
||||
raw = json.dumps({
|
||||
"data": [{"title": "D1"}, {"title": "D2"}]
|
||||
})
|
||||
result = _parse_llm_json_array(raw)
|
||||
assert len(result) == 2
|
||||
|
||||
def test_single_dict_returned_as_list(self):
|
||||
"""A single JSON object (no array) is wrapped in a list."""
|
||||
raw = json.dumps({"title": "SingleControl", "objective": "Obj"})
|
||||
result = _parse_llm_json_array(raw)
|
||||
assert len(result) == 1
|
||||
assert result[0]["title"] == "SingleControl"
|
||||
|
||||
def test_individual_json_objects_fallback(self):
|
||||
"""Multiple separate JSON objects (not in array) are collected."""
|
||||
raw = (
|
||||
'Here are the controls:\n'
|
||||
'{"title": "Ctrl1", "objective": "A"}\n'
|
||||
'{"title": "Ctrl2", "objective": "B"}\n'
|
||||
)
|
||||
result = _parse_llm_json_array(raw)
|
||||
assert len(result) == 2
|
||||
titles = {r["title"] for r in result}
|
||||
assert "Ctrl1" in titles
|
||||
assert "Ctrl2" in titles
|
||||
|
||||
def test_individual_objects_require_title(self):
|
||||
"""Fallback individual-object parsing only includes objects with 'title'."""
|
||||
raw = (
|
||||
'{"title": "HasTitle", "objective": "Yes"}\n'
|
||||
'{"no_title_field": "skip_me"}\n'
|
||||
)
|
||||
result = _parse_llm_json_array(raw)
|
||||
assert len(result) == 1
|
||||
assert result[0]["title"] == "HasTitle"
|
||||
|
||||
def test_empty_string_returns_empty_list(self):
|
||||
"""Empty input returns empty list."""
|
||||
result = _parse_llm_json_array("")
|
||||
assert result == []
|
||||
|
||||
def test_invalid_input_returns_empty_list(self):
|
||||
"""Completely invalid input returns empty list."""
|
||||
result = _parse_llm_json_array("this is not json at all, no braces anywhere")
|
||||
assert result == []
|
||||
|
||||
def test_garbage_with_no_json_returns_empty(self):
|
||||
"""Random non-JSON text should return empty list."""
|
||||
result = _parse_llm_json_array("Hier ist meine Antwort: leider kann ich das nicht.")
|
||||
assert result == []
|
||||
|
||||
def test_bracket_block_extraction(self):
|
||||
"""Array embedded in preamble text is extracted via bracket matching."""
|
||||
raw = 'Some preamble text...\n[{"title": "Extracted", "objective": "X"}]\nSome trailing text.'
|
||||
result = _parse_llm_json_array(raw)
|
||||
assert len(result) == 1
|
||||
assert result[0]["title"] == "Extracted"
|
||||
|
||||
def test_nested_objects_in_array(self):
|
||||
"""Array elements with nested objects (like scope) are parsed correctly."""
|
||||
raw = json.dumps([
|
||||
{
|
||||
"title": "Nested",
|
||||
"objective": "Test",
|
||||
"scope": {"regions": ["EU", "DE"]},
|
||||
"requirements": ["Req1"],
|
||||
}
|
||||
])
|
||||
result = _parse_llm_json_array(raw)
|
||||
assert len(result) == 1
|
||||
assert result[0]["scope"]["regions"] == ["EU", "DE"]
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Batch Size Configuration Tests
|
||||
# =============================================================================
|
||||
|
||||
class TestBatchSizeConfig:
|
||||
"""Tests for batch_size configuration on GeneratorConfig."""
|
||||
|
||||
def test_default_batch_size(self):
|
||||
config = GeneratorConfig()
|
||||
assert config.batch_size == 5
|
||||
|
||||
def test_custom_batch_size(self):
|
||||
config = GeneratorConfig(batch_size=10)
|
||||
assert config.batch_size == 10
|
||||
|
||||
def test_batch_size_of_one(self):
|
||||
config = GeneratorConfig(batch_size=1)
|
||||
assert config.batch_size == 1
|
||||
|
||||
def test_batch_size_used_in_pipeline_constant(self):
|
||||
"""Verify that pipeline uses config.batch_size (not a hardcoded value)."""
|
||||
config = GeneratorConfig(batch_size=3)
|
||||
# BATCH_SIZE = config.batch_size or 5 — with batch_size=3 it should be 3
|
||||
batch_size = config.batch_size or 5
|
||||
assert batch_size == 3
|
||||
|
||||
def test_batch_size_zero_falls_back_to_five(self):
|
||||
"""batch_size=0 triggers `or 5` fallback in the pipeline loop."""
|
||||
config = GeneratorConfig(batch_size=0)
|
||||
# Mimics the pipeline logic: BATCH_SIZE = config.batch_size or 5
|
||||
batch_size = config.batch_size or 5
|
||||
assert batch_size == 5
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Batch Processing Loop Tests (Mocked)
|
||||
# =============================================================================
|
||||
|
||||
class TestBatchProcessingLoop:
|
||||
"""Tests for _process_batch, _structure_batch, _reformulate_batch with mocked LLM."""
|
||||
|
||||
def _make_chunk(self, regulation_code="owasp_asvs", article="V2.1.1", text="Test requirement"):
|
||||
return RAGSearchResult(
|
||||
text=text,
|
||||
regulation_code=regulation_code,
|
||||
regulation_name="OWASP ASVS" if "owasp" in regulation_code else "Test Source",
|
||||
regulation_short="OWASP" if "owasp" in regulation_code else "TEST",
|
||||
category="requirement",
|
||||
article=article,
|
||||
paragraph="",
|
||||
source_url="https://example.com",
|
||||
score=0.9,
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_process_batch_splits_by_license_rule(self):
|
||||
"""_process_batch routes Rule 1+2 to _structure_batch and Rule 3 to _reformulate_batch."""
|
||||
mock_db = MagicMock()
|
||||
pipeline = ControlGeneratorPipeline(db=mock_db)
|
||||
pipeline._existing_controls = []
|
||||
|
||||
chunk_r1 = self._make_chunk("eu_2016_679", "Art. 35", "DSGVO text")
|
||||
chunk_r3 = self._make_chunk("bsi_tr03161", "O.Auth_1", "BSI text")
|
||||
|
||||
batch_items = [
|
||||
(chunk_r1, {"rule": 1, "name": "DSGVO", "license": "EU_LAW"}),
|
||||
(chunk_r3, {"rule": 3, "name": "INTERNAL_ONLY"}),
|
||||
]
|
||||
|
||||
# Mock _structure_batch and _reformulate_batch
|
||||
structure_ctrl = GeneratedControl(title="Structured", license_rule=1, release_state="draft")
|
||||
reform_ctrl = GeneratedControl(title="Reformed", license_rule=3, release_state="draft")
|
||||
|
||||
mock_finder_instance = AsyncMock()
|
||||
mock_finder_instance.find_anchors = AsyncMock(return_value=[])
|
||||
mock_finder_cls = MagicMock(return_value=mock_finder_instance)
|
||||
|
||||
with patch.object(pipeline, "_structure_batch", new_callable=AsyncMock, return_value=[structure_ctrl]) as mock_struct, \
|
||||
patch.object(pipeline, "_reformulate_batch", new_callable=AsyncMock, return_value=[reform_ctrl]) as mock_reform, \
|
||||
patch.object(pipeline, "_check_harmonization", new_callable=AsyncMock, return_value=[]), \
|
||||
patch("compliance.services.anchor_finder.AnchorFinder", mock_finder_cls), \
|
||||
patch("compliance.services.control_generator.check_similarity", new_callable=AsyncMock) as mock_sim:
|
||||
mock_sim.return_value = MagicMock(status="PASS", token_overlap=0.1, ngram_jaccard=0.1, lcs_ratio=0.1)
|
||||
config = GeneratorConfig(batch_size=5)
|
||||
result = await pipeline._process_batch(batch_items, config, "test-job-123")
|
||||
|
||||
mock_struct.assert_called_once()
|
||||
mock_reform.assert_called_once()
|
||||
# structure_batch received only the Rule 1 chunk
|
||||
assert len(mock_struct.call_args[0][0]) == 1
|
||||
# reformulate_batch received only the Rule 3 chunk
|
||||
assert len(mock_reform.call_args[0][0]) == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_structure_batch_calls_llm_and_parses(self):
|
||||
"""_structure_batch sends prompt to LLM and parses array response."""
|
||||
mock_db = MagicMock()
|
||||
pipeline = ControlGeneratorPipeline(db=mock_db)
|
||||
|
||||
chunks = [
|
||||
self._make_chunk("eu_2016_679", "Art. 5", "Datensparsamkeit und Zweckbindung"),
|
||||
self._make_chunk("eu_2016_679", "Art. 35", "DSFA bei hohem Risiko"),
|
||||
]
|
||||
license_infos = [
|
||||
{"rule": 1, "name": "DSGVO", "license": "EU_LAW"},
|
||||
{"rule": 1, "name": "DSGVO", "license": "EU_LAW"},
|
||||
]
|
||||
|
||||
llm_response = json.dumps([
|
||||
{
|
||||
"chunk_index": 1,
|
||||
"title": "Datensparsamkeit",
|
||||
"objective": "Nur notwendige Daten erheben.",
|
||||
"rationale": "DSGVO Grundprinzip.",
|
||||
"requirements": ["Datenminimierung"],
|
||||
"test_procedure": ["Datenbestand pruefen"],
|
||||
"evidence": ["Verarbeitungsverzeichnis"],
|
||||
"severity": "high",
|
||||
"tags": ["dsgvo", "datenschutz"],
|
||||
},
|
||||
{
|
||||
"chunk_index": 2,
|
||||
"title": "DSFA Pflicht",
|
||||
"objective": "DSFA bei hohem Risiko durchfuehren.",
|
||||
"rationale": "Gesetzliche Pflicht.",
|
||||
"requirements": ["DSFA erstellen"],
|
||||
"test_procedure": ["DSFA Bericht pruefen"],
|
||||
"evidence": ["DSFA Dokumentation"],
|
||||
"severity": "high",
|
||||
"tags": ["dsfa"],
|
||||
},
|
||||
])
|
||||
|
||||
with patch("compliance.services.control_generator._llm_chat", new_callable=AsyncMock, return_value=llm_response):
|
||||
controls = await pipeline._structure_batch(chunks, license_infos)
|
||||
|
||||
assert len(controls) == 2
|
||||
assert controls[0] is not None
|
||||
assert controls[0].title == "Datensparsamkeit"
|
||||
assert controls[0].license_rule == 1
|
||||
assert controls[0].source_original_text is not None
|
||||
assert controls[0].customer_visible is True
|
||||
assert controls[0].generation_metadata["processing_path"] == "structured_batch"
|
||||
assert controls[0].generation_metadata["batch_size"] == 2
|
||||
|
||||
assert controls[1] is not None
|
||||
assert controls[1].title == "DSFA Pflicht"
|
||||
assert controls[1].license_rule == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reformulate_batch_calls_llm_and_strips_source(self):
|
||||
"""_reformulate_batch produces Rule 3 controls without source info."""
|
||||
mock_db = MagicMock()
|
||||
pipeline = ControlGeneratorPipeline(db=mock_db)
|
||||
|
||||
chunks = [
|
||||
self._make_chunk("bsi_tr03161", "O.Auth_1", "Multi-factor authentication for apps"),
|
||||
]
|
||||
config = GeneratorConfig(batch_size=5)
|
||||
|
||||
llm_response = json.dumps([
|
||||
{
|
||||
"chunk_index": 1,
|
||||
"title": "Starke Authentifizierung",
|
||||
"objective": "Mehrstufige Anmeldung implementieren.",
|
||||
"rationale": "Schutz vor unbefugtem Zugriff.",
|
||||
"requirements": ["MFA einrichten"],
|
||||
"test_procedure": ["MFA Funktionstest"],
|
||||
"evidence": ["MFA Konfiguration"],
|
||||
"severity": "critical",
|
||||
"tags": ["auth", "mfa"],
|
||||
}
|
||||
])
|
||||
|
||||
with patch("compliance.services.control_generator._llm_chat", new_callable=AsyncMock, return_value=llm_response):
|
||||
controls = await pipeline._reformulate_batch(chunks, config)
|
||||
|
||||
assert len(controls) == 1
|
||||
ctrl = controls[0]
|
||||
assert ctrl is not None
|
||||
assert ctrl.title == "Starke Authentifizierung"
|
||||
assert ctrl.license_rule == 3
|
||||
assert ctrl.source_original_text is None
|
||||
assert ctrl.source_citation is None
|
||||
assert ctrl.customer_visible is False
|
||||
assert ctrl.generation_metadata["processing_path"] == "llm_reform_batch"
|
||||
# Must not contain BSI references
|
||||
metadata_str = json.dumps(ctrl.generation_metadata)
|
||||
assert "bsi" not in metadata_str.lower()
|
||||
assert "TR-03161" not in metadata_str
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_structure_batch_maps_by_chunk_index(self):
|
||||
"""Controls are mapped back to the correct chunk via chunk_index."""
|
||||
mock_db = MagicMock()
|
||||
pipeline = ControlGeneratorPipeline(db=mock_db)
|
||||
|
||||
chunks = [
|
||||
self._make_chunk("eu_2016_679", "Art. 5", "First chunk"),
|
||||
self._make_chunk("eu_2016_679", "Art. 6", "Second chunk"),
|
||||
self._make_chunk("eu_2016_679", "Art. 7", "Third chunk"),
|
||||
]
|
||||
license_infos = [{"rule": 1, "name": "DSGVO", "license": "EU_LAW"}] * 3
|
||||
|
||||
# LLM returns them in reversed order
|
||||
llm_response = json.dumps([
|
||||
{
|
||||
"chunk_index": 3,
|
||||
"title": "Third Control",
|
||||
"objective": "Obj3",
|
||||
"rationale": "Rat3",
|
||||
"requirements": [],
|
||||
"test_procedure": [],
|
||||
"evidence": [],
|
||||
"severity": "low",
|
||||
"tags": [],
|
||||
},
|
||||
{
|
||||
"chunk_index": 1,
|
||||
"title": "First Control",
|
||||
"objective": "Obj1",
|
||||
"rationale": "Rat1",
|
||||
"requirements": [],
|
||||
"test_procedure": [],
|
||||
"evidence": [],
|
||||
"severity": "high",
|
||||
"tags": [],
|
||||
},
|
||||
{
|
||||
"chunk_index": 2,
|
||||
"title": "Second Control",
|
||||
"objective": "Obj2",
|
||||
"rationale": "Rat2",
|
||||
"requirements": [],
|
||||
"test_procedure": [],
|
||||
"evidence": [],
|
||||
"severity": "medium",
|
||||
"tags": [],
|
||||
},
|
||||
])
|
||||
|
||||
with patch("compliance.services.control_generator._llm_chat", new_callable=AsyncMock, return_value=llm_response):
|
||||
controls = await pipeline._structure_batch(chunks, license_infos)
|
||||
|
||||
assert len(controls) == 3
|
||||
# Verify correct mapping despite shuffled chunk_index
|
||||
assert controls[0].title == "First Control"
|
||||
assert controls[1].title == "Second Control"
|
||||
assert controls[2].title == "Third Control"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_structure_batch_falls_back_to_position(self):
|
||||
"""If chunk_index is missing, controls are assigned by position."""
|
||||
mock_db = MagicMock()
|
||||
pipeline = ControlGeneratorPipeline(db=mock_db)
|
||||
|
||||
chunks = [
|
||||
self._make_chunk("eu_2016_679", "Art. 5", "Chunk A"),
|
||||
self._make_chunk("eu_2016_679", "Art. 6", "Chunk B"),
|
||||
]
|
||||
license_infos = [{"rule": 1, "name": "DSGVO", "license": "EU_LAW"}] * 2
|
||||
|
||||
# No chunk_index in response
|
||||
llm_response = json.dumps([
|
||||
{
|
||||
"title": "PositionA",
|
||||
"objective": "ObjA",
|
||||
"rationale": "RatA",
|
||||
"requirements": [],
|
||||
"test_procedure": [],
|
||||
"evidence": [],
|
||||
"severity": "medium",
|
||||
"tags": [],
|
||||
},
|
||||
{
|
||||
"title": "PositionB",
|
||||
"objective": "ObjB",
|
||||
"rationale": "RatB",
|
||||
"requirements": [],
|
||||
"test_procedure": [],
|
||||
"evidence": [],
|
||||
"severity": "medium",
|
||||
"tags": [],
|
||||
},
|
||||
])
|
||||
|
||||
with patch("compliance.services.control_generator._llm_chat", new_callable=AsyncMock, return_value=llm_response):
|
||||
controls = await pipeline._structure_batch(chunks, license_infos)
|
||||
|
||||
assert len(controls) == 2
|
||||
assert controls[0].title == "PositionA"
|
||||
assert controls[1].title == "PositionB"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_process_batch_only_structure_no_reform(self):
|
||||
"""Batch with only Rule 1+2 items skips _reformulate_batch."""
|
||||
mock_db = MagicMock()
|
||||
pipeline = ControlGeneratorPipeline(db=mock_db)
|
||||
pipeline._existing_controls = []
|
||||
|
||||
chunk = self._make_chunk("eu_2016_679", "Art. 5", "DSGVO text")
|
||||
batch_items = [
|
||||
(chunk, {"rule": 1, "name": "DSGVO", "license": "EU_LAW"}),
|
||||
]
|
||||
|
||||
ctrl = GeneratedControl(title="OnlyStructure", license_rule=1, release_state="draft")
|
||||
|
||||
mock_finder_instance = AsyncMock()
|
||||
mock_finder_instance.find_anchors = AsyncMock(return_value=[])
|
||||
mock_finder_cls = MagicMock(return_value=mock_finder_instance)
|
||||
|
||||
with patch.object(pipeline, "_structure_batch", new_callable=AsyncMock, return_value=[ctrl]) as mock_struct, \
|
||||
patch.object(pipeline, "_reformulate_batch", new_callable=AsyncMock) as mock_reform, \
|
||||
patch.object(pipeline, "_check_harmonization", new_callable=AsyncMock, return_value=[]), \
|
||||
patch("compliance.services.anchor_finder.AnchorFinder", mock_finder_cls):
|
||||
config = GeneratorConfig()
|
||||
result = await pipeline._process_batch(batch_items, config, "job-1")
|
||||
|
||||
mock_struct.assert_called_once()
|
||||
mock_reform.assert_not_called()
|
||||
assert len(result) == 1
|
||||
assert result[0].title == "OnlyStructure"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_process_batch_only_reform_no_structure(self):
|
||||
"""Batch with only Rule 3 items skips _structure_batch."""
|
||||
mock_db = MagicMock()
|
||||
pipeline = ControlGeneratorPipeline(db=mock_db)
|
||||
pipeline._existing_controls = []
|
||||
|
||||
chunk = self._make_chunk("bsi_tr03161", "O.Auth_1", "BSI text")
|
||||
batch_items = [
|
||||
(chunk, {"rule": 3, "name": "INTERNAL_ONLY"}),
|
||||
]
|
||||
|
||||
ctrl = GeneratedControl(title="OnlyReform", license_rule=3, release_state="draft")
|
||||
|
||||
mock_finder_instance = AsyncMock()
|
||||
mock_finder_instance.find_anchors = AsyncMock(return_value=[])
|
||||
mock_finder_cls = MagicMock(return_value=mock_finder_instance)
|
||||
|
||||
with patch.object(pipeline, "_structure_batch", new_callable=AsyncMock) as mock_struct, \
|
||||
patch.object(pipeline, "_reformulate_batch", new_callable=AsyncMock, return_value=[ctrl]) as mock_reform, \
|
||||
patch.object(pipeline, "_check_harmonization", new_callable=AsyncMock, return_value=[]), \
|
||||
patch("compliance.services.anchor_finder.AnchorFinder", mock_finder_cls), \
|
||||
patch("compliance.services.control_generator.check_similarity", new_callable=AsyncMock) as mock_sim:
|
||||
mock_sim.return_value = MagicMock(status="PASS", token_overlap=0.1, ngram_jaccard=0.1, lcs_ratio=0.1)
|
||||
config = GeneratorConfig()
|
||||
result = await pipeline._process_batch(batch_items, config, "job-2")
|
||||
|
||||
mock_struct.assert_not_called()
|
||||
mock_reform.assert_called_once()
|
||||
assert len(result) == 1
|
||||
assert result[0].title == "OnlyReform"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_process_batch_mixed_rules(self):
|
||||
"""Batch with mixed Rule 1 and Rule 3 items calls both sub-methods."""
|
||||
mock_db = MagicMock()
|
||||
pipeline = ControlGeneratorPipeline(db=mock_db)
|
||||
pipeline._existing_controls = []
|
||||
|
||||
chunk_r1 = self._make_chunk("eu_2016_679", "Art. 5", "DSGVO")
|
||||
chunk_r2 = self._make_chunk("owasp_asvs", "V2.1", "OWASP")
|
||||
chunk_r3a = self._make_chunk("bsi_tr03161", "O.Auth_1", "BSI A")
|
||||
chunk_r3b = self._make_chunk("iso_27001", "A.9.1", "ISO B")
|
||||
|
||||
batch_items = [
|
||||
(chunk_r1, {"rule": 1, "name": "DSGVO", "license": "EU_LAW"}),
|
||||
(chunk_r3a, {"rule": 3, "name": "INTERNAL_ONLY"}),
|
||||
(chunk_r2, {"rule": 2, "name": "OWASP ASVS", "license": "CC-BY-SA-4.0", "attribution": "OWASP Foundation"}),
|
||||
(chunk_r3b, {"rule": 3, "name": "INTERNAL_ONLY"}),
|
||||
]
|
||||
|
||||
struct_ctrls = [
|
||||
GeneratedControl(title="DSGVO Ctrl", license_rule=1, release_state="draft"),
|
||||
GeneratedControl(title="OWASP Ctrl", license_rule=2, release_state="draft"),
|
||||
]
|
||||
reform_ctrls = [
|
||||
GeneratedControl(title="BSI Ctrl", license_rule=3, release_state="draft"),
|
||||
GeneratedControl(title="ISO Ctrl", license_rule=3, release_state="draft"),
|
||||
]
|
||||
|
||||
mock_finder_instance = AsyncMock()
|
||||
mock_finder_instance.find_anchors = AsyncMock(return_value=[])
|
||||
mock_finder_cls = MagicMock(return_value=mock_finder_instance)
|
||||
|
||||
with patch.object(pipeline, "_structure_batch", new_callable=AsyncMock, return_value=struct_ctrls) as mock_struct, \
|
||||
patch.object(pipeline, "_reformulate_batch", new_callable=AsyncMock, return_value=reform_ctrls) as mock_reform, \
|
||||
patch.object(pipeline, "_check_harmonization", new_callable=AsyncMock, return_value=[]), \
|
||||
patch("compliance.services.anchor_finder.AnchorFinder", mock_finder_cls), \
|
||||
patch("compliance.services.control_generator.check_similarity", new_callable=AsyncMock) as mock_sim:
|
||||
mock_sim.return_value = MagicMock(status="PASS", token_overlap=0.05, ngram_jaccard=0.05, lcs_ratio=0.05)
|
||||
config = GeneratorConfig()
|
||||
result = await pipeline._process_batch(batch_items, config, "job-mixed")
|
||||
|
||||
# Both methods called
|
||||
mock_struct.assert_called_once()
|
||||
mock_reform.assert_called_once()
|
||||
# structure_batch gets 2 items (Rule 1 + Rule 2)
|
||||
assert len(mock_struct.call_args[0][0]) == 2
|
||||
# reformulate_batch gets 2 items (Rule 3 + Rule 3)
|
||||
assert len(mock_reform.call_args[0][0]) == 2
|
||||
# Result has 4 controls total
|
||||
assert len(result) == 4
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_process_batch_empty_batch(self):
|
||||
"""Empty batch returns empty list."""
|
||||
mock_db = MagicMock()
|
||||
pipeline = ControlGeneratorPipeline(db=mock_db)
|
||||
pipeline._existing_controls = []
|
||||
|
||||
config = GeneratorConfig()
|
||||
result = await pipeline._process_batch([], config, "job-empty")
|
||||
assert result == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reformulate_batch_too_close_flagged(self):
|
||||
"""Rule 3 controls that are too similar to source get flagged."""
|
||||
mock_db = MagicMock()
|
||||
pipeline = ControlGeneratorPipeline(db=mock_db)
|
||||
pipeline._existing_controls = []
|
||||
|
||||
chunk = self._make_chunk("bsi_tr03161", "O.Auth_1", "Authentication must use MFA")
|
||||
batch_items = [
|
||||
(chunk, {"rule": 3, "name": "INTERNAL_ONLY"}),
|
||||
]
|
||||
|
||||
ctrl = GeneratedControl(
|
||||
title="Auth MFA",
|
||||
objective="Authentication must use MFA",
|
||||
rationale="Security",
|
||||
license_rule=3,
|
||||
release_state="draft",
|
||||
generation_metadata={},
|
||||
)
|
||||
|
||||
# Simulate similarity FAIL (too close to source)
|
||||
fail_report = MagicMock(status="FAIL", token_overlap=0.85, ngram_jaccard=0.9, lcs_ratio=0.88)
|
||||
|
||||
mock_finder_instance = AsyncMock()
|
||||
mock_finder_instance.find_anchors = AsyncMock(return_value=[])
|
||||
mock_finder_cls = MagicMock(return_value=mock_finder_instance)
|
||||
|
||||
with patch.object(pipeline, "_structure_batch", new_callable=AsyncMock), \
|
||||
patch.object(pipeline, "_reformulate_batch", new_callable=AsyncMock, return_value=[ctrl]), \
|
||||
patch.object(pipeline, "_check_harmonization", new_callable=AsyncMock, return_value=[]), \
|
||||
patch("compliance.services.anchor_finder.AnchorFinder", mock_finder_cls), \
|
||||
patch("compliance.services.control_generator.check_similarity", new_callable=AsyncMock, return_value=fail_report):
|
||||
config = GeneratorConfig()
|
||||
result = await pipeline._process_batch(batch_items, config, "job-tooclose")
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].release_state == "too_close"
|
||||
assert result[0].generation_metadata["similarity_status"] == "FAIL"
|
||||
|
||||
Reference in New Issue
Block a user