Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Failing after 47s
CI/CD / test-python-backend-compliance (push) Successful in 33s
CI/CD / test-python-document-crawler (push) Successful in 24s
CI/CD / test-python-dsms-gateway (push) Successful in 18s
CI/CD / validate-canonical-controls (push) Successful in 11s
CI/CD / Deploy (push) Has been skipped
Implements the full Multi-Layer Control Architecture for migrating ~25,000 Rich Controls into atomic, deduplicated Master Controls with full traceability. Architecture: Legal Source → Obligation → Control Pattern → Master Control → Customer Instance New services: - ObligationExtractor: 3-tier extraction (exact → embedding → LLM) - PatternMatcher: 2-tier matching (keyword + embedding + domain-bonus) - ControlComposer: Pattern + Obligation → Master Control - PipelineAdapter: Pipeline integration + Migration Passes 1-5 - DecompositionPass: Pass 0a/0b — Rich Control → atomic Controls - CrosswalkRoutes: 15 API endpoints under /v1/canonical/ New DB schema: - Migration 060: obligation_extractions, control_patterns, crosswalk_matrix - Migration 061: obligation_candidates, parent_control_uuid tracking Pattern Library: 50 YAML patterns (30 core + 20 IT-security) Go SDK: Pattern loader with YAML validation and indexing Documentation: MkDocs updated with full architecture overview 500 Python tests passing across all components. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
505 lines
18 KiB
Python
505 lines
18 KiB
Python
"""Tests for Control Pattern Library (Phase 2).
|
|
|
|
Validates:
|
|
- JSON Schema structure
|
|
- YAML pattern files against schema
|
|
- Pattern ID uniqueness and format
|
|
- Domain/category consistency
|
|
- Keyword coverage
|
|
- Cross-references (composable_with)
|
|
- Template quality (min lengths, no placeholders without defaults)
|
|
"""
|
|
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
from collections import Counter
|
|
|
|
import pytest
|
|
import yaml
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
|
|
PATTERNS_DIR = REPO_ROOT / "ai-compliance-sdk" / "policies" / "control_patterns"
|
|
SCHEMA_FILE = PATTERNS_DIR / "_pattern_schema.json"
|
|
CORE_FILE = PATTERNS_DIR / "core_patterns.yaml"
|
|
IT_SEC_FILE = PATTERNS_DIR / "domain_it_security.yaml"
|
|
|
|
VALID_DOMAINS = [
|
|
"AUTH", "CRYP", "NET", "DATA", "LOG", "ACC", "SEC",
|
|
"INC", "AI", "COMP", "GOV", "LAB", "FIN", "TRD", "ENV", "HLT",
|
|
]
|
|
|
|
VALID_SEVERITIES = ["low", "medium", "high", "critical"]
|
|
VALID_EFFORTS = ["s", "m", "l", "xl"]
|
|
|
|
PATTERN_ID_RE = re.compile(r"^CP-[A-Z]+-[0-9]{3}$")
|
|
NAME_RE = re.compile(r"^[a-z][a-z0-9_]*$")
|
|
|
|
|
|
# =============================================================================
|
|
# Fixtures
|
|
# =============================================================================
|
|
|
|
|
|
@pytest.fixture
|
|
def schema():
|
|
"""Load the JSON schema."""
|
|
assert SCHEMA_FILE.exists(), f"Schema file not found: {SCHEMA_FILE}"
|
|
with open(SCHEMA_FILE) as f:
|
|
return json.load(f)
|
|
|
|
|
|
@pytest.fixture
|
|
def core_patterns():
|
|
"""Load core patterns."""
|
|
assert CORE_FILE.exists(), f"Core patterns file not found: {CORE_FILE}"
|
|
with open(CORE_FILE) as f:
|
|
data = yaml.safe_load(f)
|
|
return data["patterns"]
|
|
|
|
|
|
@pytest.fixture
|
|
def it_sec_patterns():
|
|
"""Load IT security patterns."""
|
|
assert IT_SEC_FILE.exists(), f"IT security patterns file not found: {IT_SEC_FILE}"
|
|
with open(IT_SEC_FILE) as f:
|
|
data = yaml.safe_load(f)
|
|
return data["patterns"]
|
|
|
|
|
|
@pytest.fixture
|
|
def all_patterns(core_patterns, it_sec_patterns):
|
|
"""Combined list of all patterns."""
|
|
return core_patterns + it_sec_patterns
|
|
|
|
|
|
# =============================================================================
|
|
# Schema Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestPatternSchema:
|
|
"""Validate the JSON Schema file itself."""
|
|
|
|
def test_schema_exists(self):
|
|
assert SCHEMA_FILE.exists()
|
|
|
|
def test_schema_is_valid_json(self, schema):
|
|
assert "$schema" in schema
|
|
assert "properties" in schema
|
|
|
|
def test_schema_defines_pattern(self, schema):
|
|
assert "ControlPattern" in schema.get("$defs", {})
|
|
|
|
def test_schema_requires_key_fields(self, schema):
|
|
pattern_def = schema["$defs"]["ControlPattern"]
|
|
required = pattern_def["required"]
|
|
for field in [
|
|
"id", "name", "name_de", "domain", "category",
|
|
"description", "objective_template", "rationale_template",
|
|
"requirements_template", "test_procedure_template",
|
|
"evidence_template", "severity_default",
|
|
"obligation_match_keywords", "tags",
|
|
]:
|
|
assert field in required, f"Missing required field in schema: {field}"
|
|
|
|
def test_schema_domain_enum(self, schema):
|
|
pattern_def = schema["$defs"]["ControlPattern"]
|
|
domain_enum = pattern_def["properties"]["domain"]["enum"]
|
|
assert set(domain_enum) == set(VALID_DOMAINS)
|
|
|
|
|
|
# =============================================================================
|
|
# File Structure Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestFileStructure:
|
|
"""Validate YAML file structure."""
|
|
|
|
def test_core_file_exists(self):
|
|
assert CORE_FILE.exists()
|
|
|
|
def test_it_sec_file_exists(self):
|
|
assert IT_SEC_FILE.exists()
|
|
|
|
def test_core_has_version(self):
|
|
with open(CORE_FILE) as f:
|
|
data = yaml.safe_load(f)
|
|
assert "version" in data
|
|
assert data["version"] == "1.0"
|
|
|
|
def test_it_sec_has_version(self):
|
|
with open(IT_SEC_FILE) as f:
|
|
data = yaml.safe_load(f)
|
|
assert "version" in data
|
|
assert data["version"] == "1.0"
|
|
|
|
def test_core_has_description(self):
|
|
with open(CORE_FILE) as f:
|
|
data = yaml.safe_load(f)
|
|
assert "description" in data
|
|
assert len(data["description"]) > 20
|
|
|
|
def test_it_sec_has_description(self):
|
|
with open(IT_SEC_FILE) as f:
|
|
data = yaml.safe_load(f)
|
|
assert "description" in data
|
|
assert len(data["description"]) > 20
|
|
|
|
|
|
# =============================================================================
|
|
# Pattern Count Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestPatternCounts:
|
|
"""Verify expected number of patterns."""
|
|
|
|
def test_core_has_30_patterns(self, core_patterns):
|
|
assert len(core_patterns) == 30, (
|
|
f"Expected 30 core patterns, got {len(core_patterns)}"
|
|
)
|
|
|
|
def test_it_sec_has_20_patterns(self, it_sec_patterns):
|
|
assert len(it_sec_patterns) == 20, (
|
|
f"Expected 20 IT security patterns, got {len(it_sec_patterns)}"
|
|
)
|
|
|
|
def test_total_is_50(self, all_patterns):
|
|
assert len(all_patterns) == 50, (
|
|
f"Expected 50 total patterns, got {len(all_patterns)}"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Pattern ID Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestPatternIDs:
|
|
"""Validate pattern ID format and uniqueness."""
|
|
|
|
def test_all_ids_match_format(self, all_patterns):
|
|
for p in all_patterns:
|
|
assert PATTERN_ID_RE.match(p["id"]), (
|
|
f"Invalid pattern ID format: {p['id']} (expected CP-DOMAIN-NNN)"
|
|
)
|
|
|
|
def test_all_ids_unique(self, all_patterns):
|
|
ids = [p["id"] for p in all_patterns]
|
|
duplicates = [id for id, count in Counter(ids).items() if count > 1]
|
|
assert not duplicates, f"Duplicate pattern IDs: {duplicates}"
|
|
|
|
def test_all_names_unique(self, all_patterns):
|
|
names = [p["name"] for p in all_patterns]
|
|
duplicates = [n for n, count in Counter(names).items() if count > 1]
|
|
assert not duplicates, f"Duplicate pattern names: {duplicates}"
|
|
|
|
def test_id_domain_matches_domain_field(self, all_patterns):
|
|
"""The domain in the ID (CP-{DOMAIN}-NNN) should match the domain field."""
|
|
for p in all_patterns:
|
|
id_domain = p["id"].split("-")[1]
|
|
assert id_domain == p["domain"], (
|
|
f"Pattern {p['id']}: ID domain '{id_domain}' != field domain '{p['domain']}'"
|
|
)
|
|
|
|
def test_all_names_are_snake_case(self, all_patterns):
|
|
for p in all_patterns:
|
|
assert NAME_RE.match(p["name"]), (
|
|
f"Pattern {p['id']}: name '{p['name']}' is not snake_case"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Domain & Category Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestDomainCategories:
|
|
"""Validate domain and category assignments."""
|
|
|
|
def test_all_domains_valid(self, all_patterns):
|
|
for p in all_patterns:
|
|
assert p["domain"] in VALID_DOMAINS, (
|
|
f"Pattern {p['id']}: invalid domain '{p['domain']}'"
|
|
)
|
|
|
|
def test_domain_coverage(self, all_patterns):
|
|
"""At least 5 different domains should be covered."""
|
|
domains = {p["domain"] for p in all_patterns}
|
|
assert len(domains) >= 5, (
|
|
f"Only {len(domains)} domains covered: {domains}"
|
|
)
|
|
|
|
def test_all_have_category(self, all_patterns):
|
|
for p in all_patterns:
|
|
assert p.get("category"), (
|
|
f"Pattern {p['id']}: missing category"
|
|
)
|
|
|
|
def test_category_not_empty(self, all_patterns):
|
|
for p in all_patterns:
|
|
assert len(p["category"]) >= 3, (
|
|
f"Pattern {p['id']}: category too short: '{p['category']}'"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Template Quality Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestTemplateQuality:
|
|
"""Validate template content quality."""
|
|
|
|
def test_description_min_length(self, all_patterns):
|
|
for p in all_patterns:
|
|
desc = p["description"].strip()
|
|
assert len(desc) >= 30, (
|
|
f"Pattern {p['id']}: description too short ({len(desc)} chars)"
|
|
)
|
|
|
|
def test_objective_min_length(self, all_patterns):
|
|
for p in all_patterns:
|
|
obj = p["objective_template"].strip()
|
|
assert len(obj) >= 30, (
|
|
f"Pattern {p['id']}: objective_template too short ({len(obj)} chars)"
|
|
)
|
|
|
|
def test_rationale_min_length(self, all_patterns):
|
|
for p in all_patterns:
|
|
rat = p["rationale_template"].strip()
|
|
assert len(rat) >= 30, (
|
|
f"Pattern {p['id']}: rationale_template too short ({len(rat)} chars)"
|
|
)
|
|
|
|
def test_requirements_min_count(self, all_patterns):
|
|
for p in all_patterns:
|
|
reqs = p["requirements_template"]
|
|
assert len(reqs) >= 2, (
|
|
f"Pattern {p['id']}: needs at least 2 requirements, got {len(reqs)}"
|
|
)
|
|
|
|
def test_requirements_not_empty(self, all_patterns):
|
|
for p in all_patterns:
|
|
for i, req in enumerate(p["requirements_template"]):
|
|
assert len(req.strip()) >= 10, (
|
|
f"Pattern {p['id']}: requirement {i} too short"
|
|
)
|
|
|
|
def test_test_procedure_min_count(self, all_patterns):
|
|
for p in all_patterns:
|
|
tests = p["test_procedure_template"]
|
|
assert len(tests) >= 1, (
|
|
f"Pattern {p['id']}: needs at least 1 test procedure"
|
|
)
|
|
|
|
def test_evidence_min_count(self, all_patterns):
|
|
for p in all_patterns:
|
|
evidence = p["evidence_template"]
|
|
assert len(evidence) >= 1, (
|
|
f"Pattern {p['id']}: needs at least 1 evidence item"
|
|
)
|
|
|
|
def test_name_de_exists(self, all_patterns):
|
|
for p in all_patterns:
|
|
assert p.get("name_de"), (
|
|
f"Pattern {p['id']}: missing German name (name_de)"
|
|
)
|
|
assert len(p["name_de"]) >= 5, (
|
|
f"Pattern {p['id']}: name_de too short: '{p['name_de']}'"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Severity & Effort Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestSeverityEffort:
|
|
"""Validate severity and effort assignments."""
|
|
|
|
def test_all_have_valid_severity(self, all_patterns):
|
|
for p in all_patterns:
|
|
assert p["severity_default"] in VALID_SEVERITIES, (
|
|
f"Pattern {p['id']}: invalid severity '{p['severity_default']}'"
|
|
)
|
|
|
|
def test_all_have_effort(self, all_patterns):
|
|
for p in all_patterns:
|
|
if "implementation_effort_default" in p:
|
|
assert p["implementation_effort_default"] in VALID_EFFORTS, (
|
|
f"Pattern {p['id']}: invalid effort '{p['implementation_effort_default']}'"
|
|
)
|
|
|
|
def test_severity_distribution(self, all_patterns):
|
|
"""At least 2 different severity levels should be used."""
|
|
severities = {p["severity_default"] for p in all_patterns}
|
|
assert len(severities) >= 2, (
|
|
f"Only {len(severities)} severity levels used: {severities}"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Keyword Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestKeywords:
|
|
"""Validate obligation match keywords."""
|
|
|
|
def test_all_have_keywords(self, all_patterns):
|
|
for p in all_patterns:
|
|
kws = p["obligation_match_keywords"]
|
|
assert len(kws) >= 3, (
|
|
f"Pattern {p['id']}: needs at least 3 keywords, got {len(kws)}"
|
|
)
|
|
|
|
def test_keywords_not_empty(self, all_patterns):
|
|
for p in all_patterns:
|
|
for kw in p["obligation_match_keywords"]:
|
|
assert len(kw.strip()) >= 2, (
|
|
f"Pattern {p['id']}: empty or too short keyword: '{kw}'"
|
|
)
|
|
|
|
def test_keywords_lowercase(self, all_patterns):
|
|
for p in all_patterns:
|
|
for kw in p["obligation_match_keywords"]:
|
|
assert kw == kw.lower(), (
|
|
f"Pattern {p['id']}: keyword should be lowercase: '{kw}'"
|
|
)
|
|
|
|
def test_has_german_and_english_keywords(self, all_patterns):
|
|
"""Each pattern should have keywords in both languages (spot check)."""
|
|
# At minimum, keywords should have a mix (not all German, not all English)
|
|
for p in all_patterns:
|
|
kws = p["obligation_match_keywords"]
|
|
assert len(kws) >= 3, (
|
|
f"Pattern {p['id']}: too few keywords for bilingual coverage"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Tags Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestTags:
|
|
"""Validate tags."""
|
|
|
|
def test_all_have_tags(self, all_patterns):
|
|
for p in all_patterns:
|
|
assert len(p["tags"]) >= 1, (
|
|
f"Pattern {p['id']}: needs at least 1 tag"
|
|
)
|
|
|
|
def test_tags_are_strings(self, all_patterns):
|
|
for p in all_patterns:
|
|
for tag in p["tags"]:
|
|
assert isinstance(tag, str) and len(tag) >= 2, (
|
|
f"Pattern {p['id']}: invalid tag: {tag}"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Open Anchor Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestOpenAnchors:
|
|
"""Validate open anchor references."""
|
|
|
|
def test_most_have_anchors(self, all_patterns):
|
|
"""At least 80% of patterns should have open anchor references."""
|
|
with_anchors = sum(
|
|
1 for p in all_patterns
|
|
if p.get("open_anchor_refs") and len(p["open_anchor_refs"]) >= 1
|
|
)
|
|
ratio = with_anchors / len(all_patterns)
|
|
assert ratio >= 0.80, (
|
|
f"Only {with_anchors}/{len(all_patterns)} ({ratio:.0%}) patterns have "
|
|
f"open anchor references (need >= 80%)"
|
|
)
|
|
|
|
def test_anchor_structure(self, all_patterns):
|
|
for p in all_patterns:
|
|
for anchor in p.get("open_anchor_refs", []):
|
|
assert "framework" in anchor, (
|
|
f"Pattern {p['id']}: anchor missing 'framework'"
|
|
)
|
|
assert "ref" in anchor, (
|
|
f"Pattern {p['id']}: anchor missing 'ref'"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Composability Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestComposability:
|
|
"""Validate composable_with references."""
|
|
|
|
def test_composable_refs_are_valid_ids(self, all_patterns):
|
|
all_ids = {p["id"] for p in all_patterns}
|
|
for p in all_patterns:
|
|
for ref in p.get("composable_with", []):
|
|
assert PATTERN_ID_RE.match(ref), (
|
|
f"Pattern {p['id']}: composable_with ref '{ref}' is not valid ID format"
|
|
)
|
|
assert ref in all_ids, (
|
|
f"Pattern {p['id']}: composable_with ref '{ref}' does not exist"
|
|
)
|
|
|
|
def test_no_self_references(self, all_patterns):
|
|
for p in all_patterns:
|
|
composable = p.get("composable_with", [])
|
|
assert p["id"] not in composable, (
|
|
f"Pattern {p['id']}: composable_with contains self-reference"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Cross-File Consistency Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestCrossFileConsistency:
|
|
"""Validate consistency between core and IT security files."""
|
|
|
|
def test_no_id_overlap(self, core_patterns, it_sec_patterns):
|
|
core_ids = {p["id"] for p in core_patterns}
|
|
it_sec_ids = {p["id"] for p in it_sec_patterns}
|
|
overlap = core_ids & it_sec_ids
|
|
assert not overlap, f"ID overlap between files: {overlap}"
|
|
|
|
def test_no_name_overlap(self, core_patterns, it_sec_patterns):
|
|
core_names = {p["name"] for p in core_patterns}
|
|
it_sec_names = {p["name"] for p in it_sec_patterns}
|
|
overlap = core_names & it_sec_names
|
|
assert not overlap, f"Name overlap between files: {overlap}"
|
|
|
|
|
|
# =============================================================================
|
|
# Placeholder Syntax Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestPlaceholderSyntax:
|
|
"""Validate {placeholder:default} syntax in templates."""
|
|
|
|
PLACEHOLDER_RE = re.compile(r"\{(\w+)(?::([^}]+))?\}")
|
|
|
|
def test_placeholders_have_defaults(self, all_patterns):
|
|
"""All placeholders in requirements should have defaults."""
|
|
for p in all_patterns:
|
|
for req in p["requirements_template"]:
|
|
for match in self.PLACEHOLDER_RE.finditer(req):
|
|
placeholder = match.group(1)
|
|
default = match.group(2)
|
|
# Placeholders should have defaults
|
|
assert default is not None, (
|
|
f"Pattern {p['id']}: placeholder '{{{placeholder}}}' has no default value"
|
|
)
|