feat(pipeline): pipeline_version v2, migration 062, docs + 71 tests

- Add PIPELINE_VERSION=2 constant and pipeline_version column to
  canonical_controls and canonical_processed_chunks (migration 062)
- Anthropic API decides chunk relevance via null-returns (skip_prefilter)
- Annex/appendix chunks explicitly protected in prompts
- Fix 6 failing tests (CRYP domain, _process_batch tuple return)
- Add TestPipelineVersion + TestRegulationFilter test classes (10 new tests)
- Add MkDocs page: control-generator-pipeline.md (541 lines)
- Update canonical-control-library.md with v2 pipeline diagram
- Update testing.md with 71-test breakdown table

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-17 17:31:11 +01:00
parent 653aad57e3
commit a9e0869205
7 changed files with 815 additions and 48 deletions

View File

@@ -53,6 +53,11 @@ LLM_TIMEOUT = float(os.getenv("CONTROL_GEN_LLM_TIMEOUT", "180"))
HARMONIZATION_THRESHOLD = 0.85 # Cosine similarity above this = duplicate
# Pipeline version — increment when generation rules change materially.
# v1: Original (local LLM prefilter, old prompt)
# v2: Anthropic decides relevance, null for non-requirement chunks, annexes protected
PIPELINE_VERSION = 2
ALL_COLLECTIONS = [
"bp_compliance_ce",
"bp_compliance_gesetze",
@@ -1663,7 +1668,7 @@ Kategorien: {CATEGORY_LIST_STR}"""
license_rule, source_original_text, source_citation,
customer_visible, generation_metadata,
verification_method, category, generation_strategy,
target_audience
target_audience, pipeline_version
) VALUES (
:framework_id, :control_id, :title, :objective, :rationale,
:scope, :requirements, :test_procedure, :evidence,
@@ -1672,7 +1677,7 @@ Kategorien: {CATEGORY_LIST_STR}"""
:license_rule, :source_original_text, :source_citation,
:customer_visible, :generation_metadata,
:verification_method, :category, :generation_strategy,
:target_audience
:target_audience, :pipeline_version
)
ON CONFLICT (framework_id, control_id) DO NOTHING
RETURNING id
@@ -1702,6 +1707,7 @@ Kategorien: {CATEGORY_LIST_STR}"""
"category": control.category,
"generation_strategy": control.generation_strategy,
"target_audience": json.dumps(control.target_audience) if control.target_audience else None,
"pipeline_version": PIPELINE_VERSION,
},
)
self.db.commit()
@@ -1728,11 +1734,13 @@ Kategorien: {CATEGORY_LIST_STR}"""
INSERT INTO canonical_processed_chunks (
chunk_hash, collection, regulation_code,
document_version, source_license, license_rule,
processing_path, generated_control_ids, job_id
processing_path, generated_control_ids, job_id,
pipeline_version
) VALUES (
:hash, :collection, :regulation_code,
:doc_version, :license, :rule,
:path, :control_ids, CAST(:job_id AS uuid)
:path, :control_ids, CAST(:job_id AS uuid),
:pipeline_version
)
ON CONFLICT (chunk_hash, collection, document_version) DO NOTHING
"""),
@@ -1746,6 +1754,7 @@ Kategorien: {CATEGORY_LIST_STR}"""
"path": processing_path,
"control_ids": json.dumps(control_ids),
"job_id": job_id,
"pipeline_version": PIPELINE_VERSION,
},
)
self.db.commit()

View File

@@ -0,0 +1,22 @@
-- Migration 062: Add pipeline_version to track which generation rules produced each control/chunk
--
-- v1 = Original pipeline (local LLM prefilter, old prompt without null-skip)
-- v2 = Improved pipeline (skip_prefilter, Anthropic decides relevance, annexes protected)
--
-- This allows identifying controls that may need reprocessing when pipeline rules change.
ALTER TABLE canonical_controls
ADD COLUMN IF NOT EXISTS pipeline_version smallint NOT NULL DEFAULT 1;
ALTER TABLE canonical_processed_chunks
ADD COLUMN IF NOT EXISTS pipeline_version smallint NOT NULL DEFAULT 1;
-- Index for efficient querying by version
CREATE INDEX IF NOT EXISTS idx_canonical_controls_pipeline_version
ON canonical_controls (pipeline_version);
CREATE INDEX IF NOT EXISTS idx_canonical_processed_chunks_pipeline_version
ON canonical_processed_chunks (pipeline_version);
COMMENT ON COLUMN canonical_controls.pipeline_version IS 'Generation pipeline version: 1=original (local prefilter), 2=improved (Anthropic decides relevance, annexes protected)';
COMMENT ON COLUMN canonical_processed_chunks.pipeline_version IS 'Pipeline version used when this chunk was processed';

View File

@@ -8,10 +8,12 @@ from compliance.services.control_generator import (
_classify_regulation,
_detect_domain,
_parse_llm_json,
_parse_llm_json_array,
GeneratorConfig,
GeneratedControl,
ControlGeneratorPipeline,
REGULATION_LICENSE_MAP,
PIPELINE_VERSION,
)
from compliance.services.anchor_finder import AnchorFinder, OpenAnchor
from compliance.services.rag_client import RAGSearchResult
@@ -91,7 +93,7 @@ class TestDomainDetection:
assert _detect_domain("Multi-factor authentication and password policy") == "AUTH"
def test_crypto_domain(self):
assert _detect_domain("TLS 1.3 encryption and certificate management") == "CRYPT"
assert _detect_domain("TLS 1.3 encryption and certificate management") == "CRYP"
def test_network_domain(self):
assert _detect_domain("Firewall rules and network segmentation") == "NET"
@@ -807,7 +809,7 @@ class TestBatchProcessingLoop:
patch.object(pipeline, "_check_harmonization", new_callable=AsyncMock, return_value=[]), \
patch("compliance.services.anchor_finder.AnchorFinder", mock_finder_cls):
config = GeneratorConfig()
result = await pipeline._process_batch(batch_items, config, "job-1")
result, qa_count = await pipeline._process_batch(batch_items, config, "job-1")
mock_struct.assert_called_once()
mock_reform.assert_not_called()
@@ -839,7 +841,7 @@ class TestBatchProcessingLoop:
patch("compliance.services.control_generator.check_similarity", new_callable=AsyncMock) as mock_sim:
mock_sim.return_value = MagicMock(status="PASS", token_overlap=0.1, ngram_jaccard=0.1, lcs_ratio=0.1)
config = GeneratorConfig()
result = await pipeline._process_batch(batch_items, config, "job-2")
result, qa_count = await pipeline._process_batch(batch_items, config, "job-2")
mock_struct.assert_not_called()
mock_reform.assert_called_once()
@@ -885,7 +887,7 @@ class TestBatchProcessingLoop:
patch("compliance.services.control_generator.check_similarity", new_callable=AsyncMock) as mock_sim:
mock_sim.return_value = MagicMock(status="PASS", token_overlap=0.05, ngram_jaccard=0.05, lcs_ratio=0.05)
config = GeneratorConfig()
result = await pipeline._process_batch(batch_items, config, "job-mixed")
result, qa_count = await pipeline._process_batch(batch_items, config, "job-mixed")
# Both methods called
mock_struct.assert_called_once()
@@ -905,8 +907,9 @@ class TestBatchProcessingLoop:
pipeline._existing_controls = []
config = GeneratorConfig()
result = await pipeline._process_batch([], config, "job-empty")
result, qa_count = await pipeline._process_batch([], config, "job-empty")
assert result == []
assert qa_count == 0
@pytest.mark.asyncio
async def test_reformulate_batch_too_close_flagged(self):
@@ -942,7 +945,7 @@ class TestBatchProcessingLoop:
patch("compliance.services.anchor_finder.AnchorFinder", mock_finder_cls), \
patch("compliance.services.control_generator.check_similarity", new_callable=AsyncMock, return_value=fail_report):
config = GeneratorConfig()
result = await pipeline._process_batch(batch_items, config, "job-tooclose")
result, qa_count = await pipeline._process_batch(batch_items, config, "job-tooclose")
assert len(result) == 1
assert result[0].release_state == "too_close"
@@ -1112,3 +1115,194 @@ class TestRegulationFilter:
results = await pipeline._scan_rag(config)
assert len(results) == 2
# =============================================================================
# Pipeline Version Tests
# =============================================================================
class TestPipelineVersion:
"""Tests for pipeline_version propagation in DB writes and null handling."""
def test_pipeline_version_constant_is_2(self):
assert PIPELINE_VERSION == 2
def test_store_control_includes_pipeline_version(self):
"""_store_control must pass pipeline_version=PIPELINE_VERSION to the INSERT."""
mock_db = MagicMock()
# Framework lookup returns a UUID
fw_row = MagicMock()
fw_row.__getitem__ = lambda self, idx: "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
mock_db.execute.return_value.fetchone.return_value = fw_row
pipeline = ControlGeneratorPipeline(db=mock_db, rag_client=MagicMock())
control = GeneratedControl(
control_id="SEC-TEST-001",
title="Test Control",
objective="Test objective",
)
pipeline._store_control(control, job_id="00000000-0000-0000-0000-000000000001")
# The second call to db.execute is the INSERT
calls = mock_db.execute.call_args_list
assert len(calls) >= 2, f"Expected at least 2 db.execute calls, got {len(calls)}"
insert_call = calls[1]
params = insert_call[0][1] # positional arg 1 = params dict
assert "pipeline_version" in params
assert params["pipeline_version"] == PIPELINE_VERSION
def test_mark_chunk_processed_includes_pipeline_version(self):
"""_mark_chunk_processed must pass pipeline_version=PIPELINE_VERSION to the INSERT."""
mock_db = MagicMock()
pipeline = ControlGeneratorPipeline(db=mock_db, rag_client=MagicMock())
chunk = MagicMock()
chunk.text = "Some chunk text for hashing"
chunk.collection = "bp_compliance_ce"
chunk.regulation_code = "eu_2016_679"
license_info = {"license": "CC0-1.0", "rule": 1}
pipeline._mark_chunk_processed(
chunk=chunk,
license_info=license_info,
processing_path="structured_batch",
control_ids=["SEC-TEST-001"],
job_id="00000000-0000-0000-0000-000000000001",
)
calls = mock_db.execute.call_args_list
assert len(calls) >= 1
insert_call = calls[0]
params = insert_call[0][1]
assert "pipeline_version" in params
assert params["pipeline_version"] == PIPELINE_VERSION
@pytest.mark.asyncio
async def test_structure_batch_handles_null_results(self):
"""When _parse_llm_json_array returns [dict, None, dict], the null entries produce None."""
mock_db = MagicMock()
pipeline = ControlGeneratorPipeline(db=mock_db, rag_client=MagicMock())
# Three chunks
chunks = []
license_infos = []
for i in range(3):
c = MagicMock()
c.text = f"Chunk text number {i} with enough content for processing"
c.regulation_name = "DSGVO"
c.regulation_code = "eu_2016_679"
c.article = f"Art. {i + 1}"
c.paragraph = ""
c.source_url = ""
c.collection = "bp_compliance_ce"
chunks.append(c)
license_infos.append({"rule": 1, "name": "DSGVO", "license": "CC0-1.0"})
# LLM returns a JSON array: valid, null, valid
llm_response = json.dumps([
{
"chunk_index": 1,
"title": "Datenschutz-Kontrolle 1",
"objective": "Schutz personenbezogener Daten",
"rationale": "DSGVO-Konformitaet",
"requirements": ["Req 1"],
"test_procedure": ["Test 1"],
"evidence": ["Nachweis 1"],
"severity": "high",
"tags": ["dsgvo"],
"domain": "DATA",
"category": "datenschutz",
"target_audience": ["unternehmen"],
"source_article": "Art. 1",
"source_paragraph": "",
},
None,
{
"chunk_index": 3,
"title": "Datenschutz-Kontrolle 3",
"objective": "Transparenzpflicht",
"rationale": "Information der Betroffenen",
"requirements": ["Req 3"],
"test_procedure": ["Test 3"],
"evidence": ["Nachweis 3"],
"severity": "medium",
"tags": ["transparenz"],
"domain": "DATA",
"category": "datenschutz",
"target_audience": ["unternehmen"],
"source_article": "Art. 3",
"source_paragraph": "",
},
])
with patch("compliance.services.control_generator._llm_chat", new_callable=AsyncMock) as mock_llm:
mock_llm.return_value = llm_response
controls = await pipeline._structure_batch(chunks, license_infos)
assert len(controls) == 3
assert controls[0] is not None
assert controls[1] is None # Null entry from LLM
assert controls[2] is not None
@pytest.mark.asyncio
async def test_reformulate_batch_handles_null_results(self):
"""When _parse_llm_json_array returns [dict, None, dict], the null entries produce None."""
mock_db = MagicMock()
pipeline = ControlGeneratorPipeline(db=mock_db, rag_client=MagicMock())
chunks = []
for i in range(3):
c = MagicMock()
c.text = f"Restricted chunk text number {i} with BSI content"
c.regulation_name = "BSI TR-03161"
c.regulation_code = "bsi_tr03161"
c.article = f"Section {i + 1}"
c.paragraph = ""
c.source_url = ""
c.collection = "bp_compliance_ce"
chunks.append(c)
config = GeneratorConfig(domain="SEC")
llm_response = json.dumps([
{
"chunk_index": 1,
"title": "Sicherheitskontrolle 1",
"objective": "Authentifizierung absichern",
"rationale": "Best Practice",
"requirements": ["Req 1"],
"test_procedure": ["Test 1"],
"evidence": ["Nachweis 1"],
"severity": "high",
"tags": ["sicherheit"],
"domain": "SEC",
"category": "it-sicherheit",
"target_audience": ["it-abteilung"],
},
None,
{
"chunk_index": 3,
"title": "Sicherheitskontrolle 3",
"objective": "Netzwerk segmentieren",
"rationale": "Angriffsoberflaeche reduzieren",
"requirements": ["Req 3"],
"test_procedure": ["Test 3"],
"evidence": ["Nachweis 3"],
"severity": "medium",
"tags": ["netzwerk"],
"domain": "NET",
"category": "netzwerksicherheit",
"target_audience": ["it-abteilung"],
},
])
with patch("compliance.services.control_generator._llm_chat", new_callable=AsyncMock) as mock_llm:
mock_llm.return_value = llm_response
controls = await pipeline._reformulate_batch(chunks, config)
assert len(controls) == 3
assert controls[0] is not None
assert controls[1] is None # Null entry from LLM
assert controls[2] is not None