Files
breakpilot-compliance/backend-compliance/tests/test_migration_060.py
Benjamin Admin 825e070ed9
Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Failing after 47s
CI/CD / test-python-backend-compliance (push) Successful in 33s
CI/CD / test-python-document-crawler (push) Successful in 24s
CI/CD / test-python-dsms-gateway (push) Successful in 18s
CI/CD / validate-canonical-controls (push) Successful in 11s
CI/CD / Deploy (push) Has been skipped
feat(multi-layer): complete Multi-Layer Control Architecture (Phases 1-8 + Pass 0)
Implements the full Multi-Layer Control Architecture for migrating ~25,000
Rich Controls into atomic, deduplicated Master Controls with full traceability.

Architecture: Legal Source → Obligation → Control Pattern → Master Control → Customer Instance

New services:
- ObligationExtractor: 3-tier extraction (exact → embedding → LLM)
- PatternMatcher: 2-tier matching (keyword + embedding + domain-bonus)
- ControlComposer: Pattern + Obligation → Master Control
- PipelineAdapter: Pipeline integration + Migration Passes 1-5
- DecompositionPass: Pass 0a/0b — Rich Control → atomic Controls
- CrosswalkRoutes: 15 API endpoints under /v1/canonical/

New DB schema:
- Migration 060: obligation_extractions, control_patterns, crosswalk_matrix
- Migration 061: obligation_candidates, parent_control_uuid tracking

Pattern Library: 50 YAML patterns (30 core + 20 IT-security)
Go SDK: Pattern loader with YAML validation and indexing
Documentation: MkDocs updated with full architecture overview

500 Python tests passing across all components.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-17 09:00:37 +01:00

429 lines
17 KiB
Python

"""Tests for Migration 060: Multi-Layer Control Architecture DB Schema.
Validates SQL syntax, table definitions, constraints, and indexes
defined in 060_crosswalk_matrix.sql.
Uses an in-memory SQLite-compatible approach: we parse the SQL and validate
the structure, then run it against a real PostgreSQL test database if available.
"""
import re
from pathlib import Path
import pytest
MIGRATION_FILE = (
Path(__file__).resolve().parent.parent / "migrations" / "060_crosswalk_matrix.sql"
)
@pytest.fixture
def migration_sql():
"""Load the migration SQL file."""
assert MIGRATION_FILE.exists(), f"Migration file not found: {MIGRATION_FILE}"
return MIGRATION_FILE.read_text(encoding="utf-8")
# =============================================================================
# SQL File Structure Tests
# =============================================================================
class TestMigrationFileStructure:
"""Validate the migration file exists and has correct structure."""
def test_file_exists(self):
assert MIGRATION_FILE.exists()
def test_file_not_empty(self, migration_sql):
assert len(migration_sql.strip()) > 100
def test_has_migration_header_comment(self, migration_sql):
assert "Migration 060" in migration_sql
assert "Multi-Layer Control Architecture" in migration_sql
def test_no_explicit_transaction_control(self, migration_sql):
"""Migration runner strips BEGIN/COMMIT — file should not contain them."""
lines = migration_sql.split("\n")
for line in lines:
stripped = line.strip().upper()
if stripped.startswith("--"):
continue
assert stripped != "BEGIN;", "Migration should not contain explicit BEGIN"
assert stripped != "COMMIT;", "Migration should not contain explicit COMMIT"
# =============================================================================
# Table Definition Tests
# =============================================================================
class TestObligationExtractionsTable:
"""Validate obligation_extractions table definition."""
def test_create_table_present(self, migration_sql):
assert "CREATE TABLE IF NOT EXISTS obligation_extractions" in migration_sql
def test_has_primary_key(self, migration_sql):
# Extract the CREATE TABLE block
block = _extract_create_table(migration_sql, "obligation_extractions")
assert "id UUID PRIMARY KEY" in block
def test_has_chunk_hash_column(self, migration_sql):
block = _extract_create_table(migration_sql, "obligation_extractions")
assert "chunk_hash VARCHAR(64) NOT NULL" in block
def test_has_collection_column(self, migration_sql):
block = _extract_create_table(migration_sql, "obligation_extractions")
assert "collection VARCHAR(100) NOT NULL" in block
def test_has_regulation_code_column(self, migration_sql):
block = _extract_create_table(migration_sql, "obligation_extractions")
assert "regulation_code VARCHAR(100) NOT NULL" in block
def test_has_obligation_id_column(self, migration_sql):
block = _extract_create_table(migration_sql, "obligation_extractions")
assert "obligation_id VARCHAR(50)" in block
def test_has_confidence_column_with_check(self, migration_sql):
block = _extract_create_table(migration_sql, "obligation_extractions")
assert "confidence NUMERIC(3,2)" in block
assert "confidence >= 0" in block
assert "confidence <= 1" in block
def test_extraction_method_check_constraint(self, migration_sql):
block = _extract_create_table(migration_sql, "obligation_extractions")
assert "extraction_method VARCHAR(30) NOT NULL" in block
for method in ("exact_match", "embedding_match", "llm_extracted", "inferred"):
assert method in block, f"Missing extraction_method: {method}"
def test_has_pattern_id_column(self, migration_sql):
block = _extract_create_table(migration_sql, "obligation_extractions")
assert "pattern_id VARCHAR(50)" in block
def test_has_pattern_match_score_with_check(self, migration_sql):
block = _extract_create_table(migration_sql, "obligation_extractions")
assert "pattern_match_score NUMERIC(3,2)" in block
def test_has_control_uuid_fk(self, migration_sql):
block = _extract_create_table(migration_sql, "obligation_extractions")
assert "control_uuid UUID REFERENCES canonical_controls(id)" in block
def test_has_job_id_fk(self, migration_sql):
block = _extract_create_table(migration_sql, "obligation_extractions")
assert "job_id UUID REFERENCES canonical_generation_jobs(id)" in block
def test_has_created_at(self, migration_sql):
block = _extract_create_table(migration_sql, "obligation_extractions")
assert "created_at TIMESTAMPTZ" in block
def test_indexes_created(self, migration_sql):
expected_indexes = [
"idx_oe_obligation",
"idx_oe_pattern",
"idx_oe_control",
"idx_oe_regulation",
"idx_oe_chunk",
"idx_oe_method",
]
for idx in expected_indexes:
assert idx in migration_sql, f"Missing index: {idx}"
class TestControlPatternsTable:
"""Validate control_patterns table definition."""
def test_create_table_present(self, migration_sql):
assert "CREATE TABLE IF NOT EXISTS control_patterns" in migration_sql
def test_has_primary_key(self, migration_sql):
block = _extract_create_table(migration_sql, "control_patterns")
assert "id UUID PRIMARY KEY" in block
def test_pattern_id_unique(self, migration_sql):
block = _extract_create_table(migration_sql, "control_patterns")
assert "pattern_id VARCHAR(50) UNIQUE NOT NULL" in block
def test_has_name_column(self, migration_sql):
block = _extract_create_table(migration_sql, "control_patterns")
assert "name VARCHAR(255) NOT NULL" in block
def test_has_name_de_column(self, migration_sql):
block = _extract_create_table(migration_sql, "control_patterns")
assert "name_de VARCHAR(255)" in block
def test_has_domain_column(self, migration_sql):
block = _extract_create_table(migration_sql, "control_patterns")
assert "domain VARCHAR(10) NOT NULL" in block
def test_has_category_column(self, migration_sql):
block = _extract_create_table(migration_sql, "control_patterns")
assert "category VARCHAR(50)" in block
def test_has_template_fields(self, migration_sql):
block = _extract_create_table(migration_sql, "control_patterns")
assert "template_objective TEXT" in block
assert "template_rationale TEXT" in block
assert "template_requirements JSONB" in block
assert "template_test_procedure JSONB" in block
assert "template_evidence JSONB" in block
def test_severity_check_constraint(self, migration_sql):
block = _extract_create_table(migration_sql, "control_patterns")
for severity in ("low", "medium", "high", "critical"):
assert severity in block, f"Missing severity: {severity}"
def test_effort_check_constraint(self, migration_sql):
block = _extract_create_table(migration_sql, "control_patterns")
assert "implementation_effort_default" in block
def test_has_keyword_and_tag_fields(self, migration_sql):
block = _extract_create_table(migration_sql, "control_patterns")
assert "obligation_match_keywords JSONB" in block
assert "tags JSONB" in block
def test_has_anchor_refs(self, migration_sql):
block = _extract_create_table(migration_sql, "control_patterns")
assert "open_anchor_refs JSONB" in block
def test_has_composable_with(self, migration_sql):
block = _extract_create_table(migration_sql, "control_patterns")
assert "composable_with JSONB" in block
def test_has_version(self, migration_sql):
block = _extract_create_table(migration_sql, "control_patterns")
assert "version VARCHAR(10)" in block
def test_indexes_created(self, migration_sql):
expected_indexes = ["idx_cp_domain", "idx_cp_category", "idx_cp_pattern_id"]
for idx in expected_indexes:
assert idx in migration_sql, f"Missing index: {idx}"
class TestCrosswalkMatrixTable:
"""Validate crosswalk_matrix table definition."""
def test_create_table_present(self, migration_sql):
assert "CREATE TABLE IF NOT EXISTS crosswalk_matrix" in migration_sql
def test_has_primary_key(self, migration_sql):
block = _extract_create_table(migration_sql, "crosswalk_matrix")
assert "id UUID PRIMARY KEY" in block
def test_has_regulation_code(self, migration_sql):
block = _extract_create_table(migration_sql, "crosswalk_matrix")
assert "regulation_code VARCHAR(100) NOT NULL" in block
def test_has_article_paragraph(self, migration_sql):
block = _extract_create_table(migration_sql, "crosswalk_matrix")
assert "article VARCHAR(100)" in block
assert "paragraph VARCHAR(100)" in block
def test_has_obligation_id(self, migration_sql):
block = _extract_create_table(migration_sql, "crosswalk_matrix")
assert "obligation_id VARCHAR(50)" in block
def test_has_pattern_id(self, migration_sql):
block = _extract_create_table(migration_sql, "crosswalk_matrix")
assert "pattern_id VARCHAR(50)" in block
def test_has_master_control_fields(self, migration_sql):
block = _extract_create_table(migration_sql, "crosswalk_matrix")
assert "master_control_id VARCHAR(20)" in block
assert "master_control_uuid UUID REFERENCES canonical_controls(id)" in block
def test_has_tom_control_id(self, migration_sql):
block = _extract_create_table(migration_sql, "crosswalk_matrix")
assert "tom_control_id VARCHAR(30)" in block
def test_confidence_check(self, migration_sql):
block = _extract_create_table(migration_sql, "crosswalk_matrix")
assert "confidence NUMERIC(3,2)" in block
def test_source_check_constraint(self, migration_sql):
block = _extract_create_table(migration_sql, "crosswalk_matrix")
for source_val in ("manual", "auto", "migrated"):
assert source_val in block, f"Missing source value: {source_val}"
def test_indexes_created(self, migration_sql):
expected_indexes = [
"idx_cw_regulation",
"idx_cw_obligation",
"idx_cw_pattern",
"idx_cw_control",
"idx_cw_tom",
]
for idx in expected_indexes:
assert idx in migration_sql, f"Missing index: {idx}"
# =============================================================================
# ALTER TABLE Tests (canonical_controls extensions)
# =============================================================================
class TestCanonicalControlsExtension:
"""Validate ALTER TABLE additions to canonical_controls."""
def test_adds_pattern_id_column(self, migration_sql):
assert "ALTER TABLE canonical_controls" in migration_sql
assert "pattern_id VARCHAR(50)" in migration_sql
def test_adds_obligation_ids_column(self, migration_sql):
assert "obligation_ids JSONB" in migration_sql
def test_uses_if_not_exists(self, migration_sql):
alter_lines = [
line.strip()
for line in migration_sql.split("\n")
if "ALTER TABLE canonical_controls" in line
and "ADD COLUMN" in line
]
for line in alter_lines:
assert "IF NOT EXISTS" in line, (
f"ALTER TABLE missing IF NOT EXISTS: {line}"
)
def test_pattern_id_index(self, migration_sql):
assert "idx_cc_pattern" in migration_sql
# =============================================================================
# Cross-Cutting Concerns
# =============================================================================
class TestSQLSafety:
"""Validate SQL safety and idempotency."""
def test_all_tables_use_if_not_exists(self, migration_sql):
create_statements = re.findall(
r"CREATE TABLE\s+(?:IF NOT EXISTS\s+)?(\w+)", migration_sql
)
for match in re.finditer(r"CREATE TABLE\s+(\w+)", migration_sql):
table_name = match.group(1)
if table_name == "IF":
continue # This is part of "IF NOT EXISTS"
full_match = migration_sql[match.start() : match.start() + 60]
assert "IF NOT EXISTS" in full_match, (
f"CREATE TABLE {table_name} missing IF NOT EXISTS"
)
def test_all_indexes_use_if_not_exists(self, migration_sql):
for match in re.finditer(r"CREATE INDEX\s+(\w+)", migration_sql):
idx_name = match.group(1)
if idx_name == "IF":
continue
full_match = migration_sql[match.start() : match.start() + 80]
assert "IF NOT EXISTS" in full_match, (
f"CREATE INDEX {idx_name} missing IF NOT EXISTS"
)
def test_no_drop_statements(self, migration_sql):
"""Migration should only add, never drop."""
lines = [
l.strip()
for l in migration_sql.split("\n")
if not l.strip().startswith("--")
]
sql_content = "\n".join(lines)
assert "DROP TABLE" not in sql_content
assert "DROP INDEX" not in sql_content
assert "DROP COLUMN" not in sql_content
def test_no_truncate(self, migration_sql):
lines = [
l.strip()
for l in migration_sql.split("\n")
if not l.strip().startswith("--")
]
sql_content = "\n".join(lines)
assert "TRUNCATE" not in sql_content
def test_fk_references_existing_tables(self, migration_sql):
"""All REFERENCES must point to canonical_controls or canonical_generation_jobs."""
refs = re.findall(r"REFERENCES\s+(\w+)\(", migration_sql)
allowed_tables = {"canonical_controls", "canonical_generation_jobs"}
for ref in refs:
assert ref in allowed_tables, (
f"FK reference to unknown table: {ref}"
)
def test_consistent_varchar_sizes(self, migration_sql):
"""Key fields should use consistent sizes across tables."""
# obligation_id should be VARCHAR(50) everywhere
obligation_id_matches = re.findall(
r"obligation_id\s+VARCHAR\((\d+)\)", migration_sql
)
for size in obligation_id_matches:
assert size == "50", f"obligation_id should be VARCHAR(50), got {size}"
# pattern_id should be VARCHAR(50) everywhere
pattern_id_matches = re.findall(
r"pattern_id\s+VARCHAR\((\d+)\)", migration_sql
)
for size in pattern_id_matches:
assert size == "50", f"pattern_id should be VARCHAR(50), got {size}"
# regulation_code should be VARCHAR(100) everywhere
reg_code_matches = re.findall(
r"regulation_code\s+VARCHAR\((\d+)\)", migration_sql
)
for size in reg_code_matches:
assert size == "100", f"regulation_code should be VARCHAR(100), got {size}"
class TestTableComments:
"""Validate that all new tables have COMMENT ON TABLE."""
def test_obligation_extractions_comment(self, migration_sql):
assert "COMMENT ON TABLE obligation_extractions" in migration_sql
def test_control_patterns_comment(self, migration_sql):
assert "COMMENT ON TABLE control_patterns" in migration_sql
def test_crosswalk_matrix_comment(self, migration_sql):
assert "COMMENT ON TABLE crosswalk_matrix" in migration_sql
# =============================================================================
# Data Type Compatibility Tests
# =============================================================================
class TestDataTypeCompatibility:
"""Ensure data types are compatible with existing schema."""
def test_chunk_hash_matches_processed_chunks(self, migration_sql):
"""chunk_hash in obligation_extractions should match canonical_processed_chunks."""
block = _extract_create_table(migration_sql, "obligation_extractions")
assert "chunk_hash VARCHAR(64)" in block
def test_collection_matches_processed_chunks(self, migration_sql):
"""collection size should match canonical_processed_chunks."""
block = _extract_create_table(migration_sql, "obligation_extractions")
assert "collection VARCHAR(100)" in block
def test_control_id_size_matches_canonical_controls(self, migration_sql):
"""master_control_id VARCHAR(20) should match canonical_controls.control_id VARCHAR(20)."""
block = _extract_create_table(migration_sql, "crosswalk_matrix")
assert "master_control_id VARCHAR(20)" in block
def test_pattern_id_format_documented(self, migration_sql):
"""Pattern ID format CP-{DOMAIN}-{NNN} should be documented."""
assert "CP-{DOMAIN}-{NNN}" in migration_sql or "CP-" in migration_sql
# =============================================================================
# Helpers
# =============================================================================
def _extract_create_table(sql: str, table_name: str) -> str:
"""Extract a CREATE TABLE block from SQL."""
pattern = rf"CREATE TABLE IF NOT EXISTS {table_name}\s*\((.*?)\);"
match = re.search(pattern, sql, re.DOTALL)
if not match:
pytest.fail(f"Could not find CREATE TABLE for {table_name}")
return match.group(1)