Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Failing after 47s
CI/CD / test-python-backend-compliance (push) Successful in 33s
CI/CD / test-python-document-crawler (push) Successful in 24s
CI/CD / test-python-dsms-gateway (push) Successful in 18s
CI/CD / validate-canonical-controls (push) Successful in 11s
CI/CD / Deploy (push) Has been skipped
Implements the full Multi-Layer Control Architecture for migrating ~25,000 Rich Controls into atomic, deduplicated Master Controls with full traceability. Architecture: Legal Source → Obligation → Control Pattern → Master Control → Customer Instance New services: - ObligationExtractor: 3-tier extraction (exact → embedding → LLM) - PatternMatcher: 2-tier matching (keyword + embedding + domain-bonus) - ControlComposer: Pattern + Obligation → Master Control - PipelineAdapter: Pipeline integration + Migration Passes 1-5 - DecompositionPass: Pass 0a/0b — Rich Control → atomic Controls - CrosswalkRoutes: 15 API endpoints under /v1/canonical/ New DB schema: - Migration 060: obligation_extractions, control_patterns, crosswalk_matrix - Migration 061: obligation_candidates, parent_control_uuid tracking Pattern Library: 50 YAML patterns (30 core + 20 IT-security) Go SDK: Pattern loader with YAML validation and indexing Documentation: MkDocs updated with full architecture overview 500 Python tests passing across all components. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
902 lines
33 KiB
Python
902 lines
33 KiB
Python
"""Tests for Pattern Matcher — Phase 5 of Multi-Layer Control Architecture.
|
|
|
|
Validates:
|
|
- Pattern loading from YAML files
|
|
- Keyword index construction
|
|
- Keyword matching (Tier 1)
|
|
- Embedding matching (Tier 2) with domain bonus
|
|
- Score combination logic
|
|
- Domain affinity mapping
|
|
- Top-N matching
|
|
- PatternMatchResult serialization
|
|
- Edge cases: empty inputs, no matches, missing data
|
|
"""
|
|
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
import pytest
|
|
|
|
from compliance.services.pattern_matcher import (
|
|
DOMAIN_BONUS,
|
|
EMBEDDING_PATTERN_THRESHOLD,
|
|
KEYWORD_MATCH_MIN_HITS,
|
|
ControlPattern,
|
|
PatternMatchResult,
|
|
PatternMatcher,
|
|
_REGULATION_DOMAIN_AFFINITY,
|
|
_find_patterns_dir,
|
|
)
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
|
|
PATTERNS_DIR = REPO_ROOT / "ai-compliance-sdk" / "policies" / "control_patterns"
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: _find_patterns_dir
|
|
# =============================================================================
|
|
|
|
|
|
class TestFindPatternsDir:
|
|
"""Tests for locating the control_patterns directory."""
|
|
|
|
def test_finds_patterns_dir(self):
|
|
result = _find_patterns_dir()
|
|
if result is not None:
|
|
assert result.is_dir()
|
|
|
|
def test_patterns_dir_exists_in_repo(self):
|
|
assert PATTERNS_DIR.exists(), f"Patterns dir not found at {PATTERNS_DIR}"
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: ControlPattern
|
|
# =============================================================================
|
|
|
|
|
|
class TestControlPattern:
|
|
"""Tests for the ControlPattern dataclass."""
|
|
|
|
def test_defaults(self):
|
|
p = ControlPattern(
|
|
id="CP-TEST-001",
|
|
name="test_pattern",
|
|
name_de="Test-Muster",
|
|
domain="SEC",
|
|
category="testing",
|
|
description="A test pattern",
|
|
objective_template="Test objective",
|
|
rationale_template="Test rationale",
|
|
)
|
|
assert p.id == "CP-TEST-001"
|
|
assert p.severity_default == "medium"
|
|
assert p.implementation_effort_default == "m"
|
|
assert p.obligation_match_keywords == []
|
|
assert p.tags == []
|
|
assert p.composable_with == []
|
|
|
|
def test_full_pattern(self):
|
|
p = ControlPattern(
|
|
id="CP-AUTH-001",
|
|
name="password_policy",
|
|
name_de="Passwortrichtlinie",
|
|
domain="AUTH",
|
|
category="authentication",
|
|
description="Password requirements",
|
|
objective_template="Ensure strong passwords",
|
|
rationale_template="Weak passwords are risky",
|
|
obligation_match_keywords=["passwort", "password", "credential"],
|
|
tags=["authentication", "password"],
|
|
composable_with=["CP-AUTH-002"],
|
|
)
|
|
assert len(p.obligation_match_keywords) == 3
|
|
assert "CP-AUTH-002" in p.composable_with
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: PatternMatchResult
|
|
# =============================================================================
|
|
|
|
|
|
class TestPatternMatchResult:
|
|
"""Tests for the PatternMatchResult dataclass."""
|
|
|
|
def test_defaults(self):
|
|
result = PatternMatchResult()
|
|
assert result.pattern is None
|
|
assert result.pattern_id is None
|
|
assert result.method == "none"
|
|
assert result.confidence == 0.0
|
|
assert result.keyword_hits == 0
|
|
assert result.embedding_score == 0.0
|
|
assert result.composable_patterns == []
|
|
|
|
def test_to_dict(self):
|
|
result = PatternMatchResult(
|
|
pattern_id="CP-AUTH-001",
|
|
method="keyword",
|
|
confidence=0.857,
|
|
keyword_hits=6,
|
|
total_keywords=7,
|
|
embedding_score=0.823,
|
|
domain_bonus_applied=True,
|
|
composable_patterns=["CP-AUTH-002"],
|
|
)
|
|
d = result.to_dict()
|
|
assert d["pattern_id"] == "CP-AUTH-001"
|
|
assert d["method"] == "keyword"
|
|
assert d["confidence"] == 0.857
|
|
assert d["keyword_hits"] == 6
|
|
assert d["total_keywords"] == 7
|
|
assert d["embedding_score"] == 0.823
|
|
assert d["domain_bonus_applied"] is True
|
|
assert d["composable_patterns"] == ["CP-AUTH-002"]
|
|
|
|
def test_to_dict_keys(self):
|
|
result = PatternMatchResult()
|
|
d = result.to_dict()
|
|
expected_keys = {
|
|
"pattern_id", "method", "confidence", "keyword_hits",
|
|
"total_keywords", "embedding_score", "domain_bonus_applied",
|
|
"composable_patterns",
|
|
}
|
|
assert set(d.keys()) == expected_keys
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: PatternMatcher — Loading
|
|
# =============================================================================
|
|
|
|
|
|
class TestPatternMatcherLoad:
|
|
"""Tests for loading patterns from YAML."""
|
|
|
|
def test_load_patterns(self):
|
|
matcher = PatternMatcher()
|
|
matcher._load_patterns()
|
|
assert len(matcher._patterns) == 50
|
|
|
|
def test_by_id_populated(self):
|
|
matcher = PatternMatcher()
|
|
matcher._load_patterns()
|
|
assert "CP-AUTH-001" in matcher._by_id
|
|
assert "CP-CRYP-001" in matcher._by_id
|
|
|
|
def test_by_domain_populated(self):
|
|
matcher = PatternMatcher()
|
|
matcher._load_patterns()
|
|
assert "AUTH" in matcher._by_domain
|
|
assert "DATA" in matcher._by_domain
|
|
assert len(matcher._by_domain["AUTH"]) >= 3
|
|
|
|
def test_pattern_fields_valid(self):
|
|
"""Every loaded pattern should have all required fields."""
|
|
matcher = PatternMatcher()
|
|
matcher._load_patterns()
|
|
for p in matcher._patterns:
|
|
assert p.id, "Empty pattern ID"
|
|
assert p.name, f"{p.id}: empty name"
|
|
assert p.name_de, f"{p.id}: empty name_de"
|
|
assert p.domain, f"{p.id}: empty domain"
|
|
assert p.category, f"{p.id}: empty category"
|
|
assert p.description, f"{p.id}: empty description"
|
|
assert p.objective_template, f"{p.id}: empty objective_template"
|
|
assert len(p.obligation_match_keywords) >= 3, (
|
|
f"{p.id}: only {len(p.obligation_match_keywords)} keywords"
|
|
)
|
|
|
|
def test_no_duplicate_ids(self):
|
|
matcher = PatternMatcher()
|
|
matcher._load_patterns()
|
|
ids = [p.id for p in matcher._patterns]
|
|
assert len(ids) == len(set(ids))
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: PatternMatcher — Keyword Index
|
|
# =============================================================================
|
|
|
|
|
|
class TestKeywordIndex:
|
|
"""Tests for the reverse keyword index."""
|
|
|
|
def setup_method(self):
|
|
self.matcher = PatternMatcher()
|
|
self.matcher._load_patterns()
|
|
self.matcher._build_keyword_index()
|
|
|
|
def test_keyword_index_populated(self):
|
|
assert len(self.matcher._keyword_index) > 50
|
|
|
|
def test_keyword_maps_to_patterns(self):
|
|
"""'passwort' should map to CP-AUTH-001."""
|
|
assert "passwort" in self.matcher._keyword_index
|
|
assert "CP-AUTH-001" in self.matcher._keyword_index["passwort"]
|
|
|
|
def test_keyword_lowercase(self):
|
|
"""All keywords in the index should be lowercase."""
|
|
for kw in self.matcher._keyword_index:
|
|
assert kw == kw.lower(), f"Keyword not lowercase: {kw}"
|
|
|
|
def test_keyword_shared_across_patterns(self):
|
|
"""Some keywords like 'verschluesselung' may appear in multiple patterns."""
|
|
# This just verifies the structure allows multi-pattern keywords
|
|
for kw, pattern_ids in self.matcher._keyword_index.items():
|
|
assert len(pattern_ids) >= 1
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: PatternMatcher — Tier 1 (Keyword Match)
|
|
# =============================================================================
|
|
|
|
|
|
class TestTier1KeywordMatch:
|
|
"""Tests for keyword-based pattern matching."""
|
|
|
|
def setup_method(self):
|
|
self.matcher = PatternMatcher()
|
|
self.matcher._load_patterns()
|
|
self.matcher._build_keyword_index()
|
|
|
|
def test_password_text_matches_auth(self):
|
|
"""Text about passwords should match CP-AUTH-001."""
|
|
result = self.matcher._tier1_keyword(
|
|
"Die Passwortrichtlinie muss sicherstellen dass Anmeldedaten "
|
|
"und Credentials geschuetzt sind und authentifizierung robust ist",
|
|
None,
|
|
)
|
|
assert result is not None
|
|
assert result.pattern_id == "CP-AUTH-001"
|
|
assert result.method == "keyword"
|
|
assert result.keyword_hits >= KEYWORD_MATCH_MIN_HITS
|
|
|
|
def test_encryption_text_matches_cryp(self):
|
|
"""Text about encryption should match CP-CRYP-001."""
|
|
result = self.matcher._tier1_keyword(
|
|
"Verschluesselung ruhender Daten muss mit AES-256 encryption erfolgen",
|
|
None,
|
|
)
|
|
assert result is not None
|
|
assert result.pattern_id == "CP-CRYP-001"
|
|
assert result.keyword_hits >= KEYWORD_MATCH_MIN_HITS
|
|
|
|
def test_incident_text_matches_inc(self):
|
|
result = self.matcher._tier1_keyword(
|
|
"Ein Vorfall-Reaktionsplan muss fuer Sicherheitsvorfaelle "
|
|
"und incident response bereitstehen",
|
|
None,
|
|
)
|
|
assert result is not None
|
|
assert "INC" in result.pattern_id
|
|
|
|
def test_no_match_for_unrelated_text(self):
|
|
result = self.matcher._tier1_keyword(
|
|
"xyzzy foobar completely unrelated text with no keywords",
|
|
None,
|
|
)
|
|
assert result is None
|
|
|
|
def test_single_keyword_below_threshold(self):
|
|
"""A single keyword hit should not be enough."""
|
|
result = self.matcher._tier1_keyword("passwort", None)
|
|
assert result is None # Only 1 hit < KEYWORD_MATCH_MIN_HITS (2)
|
|
|
|
def test_domain_bonus_applied(self):
|
|
"""Domain bonus should be added when regulation matches."""
|
|
result_without = self.matcher._tier1_keyword(
|
|
"Personenbezogene Daten muessen durch Datenschutz Massnahmen "
|
|
"und datensicherheit geschuetzt werden mit datenminimierung",
|
|
None,
|
|
)
|
|
result_with = self.matcher._tier1_keyword(
|
|
"Personenbezogene Daten muessen durch Datenschutz Massnahmen "
|
|
"und datensicherheit geschuetzt werden mit datenminimierung",
|
|
"dsgvo",
|
|
)
|
|
if result_without and result_with:
|
|
# With DSGVO regulation, DATA domain patterns should get a bonus
|
|
if result_with.domain_bonus_applied:
|
|
assert result_with.confidence >= result_without.confidence
|
|
|
|
def test_keyword_scores_returns_dict(self):
|
|
scores = self.matcher._keyword_scores(
|
|
"Passwort authentifizierung credential zugang",
|
|
None,
|
|
)
|
|
assert isinstance(scores, dict)
|
|
assert "CP-AUTH-001" in scores
|
|
hits, total, confidence = scores["CP-AUTH-001"]
|
|
assert hits >= 3
|
|
assert total > 0
|
|
assert 0 < confidence <= 1.0
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: PatternMatcher — Tier 2 (Embedding Match)
|
|
# =============================================================================
|
|
|
|
|
|
class TestTier2EmbeddingMatch:
|
|
"""Tests for embedding-based pattern matching."""
|
|
|
|
def setup_method(self):
|
|
self.matcher = PatternMatcher()
|
|
self.matcher._load_patterns()
|
|
self.matcher._build_keyword_index()
|
|
# Set up fake embeddings
|
|
self.matcher._pattern_ids = [p.id for p in self.matcher._patterns]
|
|
self.matcher._pattern_embeddings = []
|
|
for i in range(len(self.matcher._patterns)):
|
|
self.matcher._pattern_embeddings.append(
|
|
[float(i % 10 + 1), float((i * 3) % 10 + 1), float((i * 7) % 10 + 1)]
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_embedding_match_identical_vector(self):
|
|
"""Identical vector should produce cosine = 1.0 > threshold."""
|
|
target = self.matcher._pattern_embeddings[0]
|
|
with patch(
|
|
"compliance.services.pattern_matcher._get_embedding",
|
|
new_callable=AsyncMock,
|
|
return_value=target,
|
|
):
|
|
result = await self.matcher._tier2_embedding("test text", None)
|
|
|
|
assert result is not None
|
|
assert result.method == "embedding"
|
|
assert result.confidence >= EMBEDDING_PATTERN_THRESHOLD
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_embedding_match_empty(self):
|
|
"""Empty embeddings should return None."""
|
|
self.matcher._pattern_embeddings = []
|
|
result = await self.matcher._tier2_embedding("test text", None)
|
|
assert result is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_embedding_match_failed_service(self):
|
|
"""Failed embedding service should return None."""
|
|
with patch(
|
|
"compliance.services.pattern_matcher._get_embedding",
|
|
new_callable=AsyncMock,
|
|
return_value=[],
|
|
):
|
|
result = await self.matcher._tier2_embedding("test", None)
|
|
assert result is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_embedding_domain_bonus(self):
|
|
"""Domain bonus should increase score for affine regulation."""
|
|
# Set all patterns to same embedding
|
|
for i in range(len(self.matcher._pattern_embeddings)):
|
|
self.matcher._pattern_embeddings[i] = [1.0, 0.0, 0.0]
|
|
|
|
with patch(
|
|
"compliance.services.pattern_matcher._get_embedding",
|
|
new_callable=AsyncMock,
|
|
return_value=[1.0, 0.0, 0.0],
|
|
):
|
|
scores = await self.matcher._embedding_scores("test", "dsgvo")
|
|
|
|
# DATA domain patterns should have bonus applied
|
|
data_patterns = [p.id for p in self.matcher._patterns if p.domain == "DATA"]
|
|
if data_patterns:
|
|
pid = data_patterns[0]
|
|
score, bonus = scores.get(pid, (0, False))
|
|
assert bonus is True
|
|
assert score > 1.0 # 1.0 cosine + 0.10 bonus
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: PatternMatcher — Score Combination
|
|
# =============================================================================
|
|
|
|
|
|
class TestScoreCombination:
|
|
"""Tests for combining keyword and embedding results."""
|
|
|
|
def setup_method(self):
|
|
self.matcher = PatternMatcher()
|
|
self.pattern = ControlPattern(
|
|
id="CP-TEST-001", name="test", name_de="Test",
|
|
domain="SEC", category="test", description="d",
|
|
objective_template="o", rationale_template="r",
|
|
)
|
|
|
|
def test_both_none(self):
|
|
result = self.matcher._combine_results(None, None)
|
|
assert result.method == "none"
|
|
assert result.confidence == 0.0
|
|
|
|
def test_only_keyword(self):
|
|
kw = PatternMatchResult(
|
|
pattern=self.pattern, pattern_id="CP-TEST-001",
|
|
method="keyword", confidence=0.7, keyword_hits=5,
|
|
)
|
|
result = self.matcher._combine_results(kw, None)
|
|
assert result.method == "keyword"
|
|
assert result.confidence == 0.7
|
|
|
|
def test_only_embedding(self):
|
|
emb = PatternMatchResult(
|
|
pattern=self.pattern, pattern_id="CP-TEST-001",
|
|
method="embedding", confidence=0.85, embedding_score=0.85,
|
|
)
|
|
result = self.matcher._combine_results(None, emb)
|
|
assert result.method == "embedding"
|
|
assert result.confidence == 0.85
|
|
|
|
def test_same_pattern_combined(self):
|
|
"""When both tiers agree, confidence gets +0.05 boost."""
|
|
kw = PatternMatchResult(
|
|
pattern=self.pattern, pattern_id="CP-TEST-001",
|
|
method="keyword", confidence=0.7, keyword_hits=5, total_keywords=7,
|
|
)
|
|
emb = PatternMatchResult(
|
|
pattern=self.pattern, pattern_id="CP-TEST-001",
|
|
method="embedding", confidence=0.8, embedding_score=0.8,
|
|
)
|
|
result = self.matcher._combine_results(kw, emb)
|
|
assert result.method == "combined"
|
|
assert abs(result.confidence - 0.85) < 1e-9 # max(0.7, 0.8) + 0.05
|
|
assert result.keyword_hits == 5
|
|
assert result.embedding_score == 0.8
|
|
|
|
def test_same_pattern_combined_capped(self):
|
|
"""Combined confidence should not exceed 1.0."""
|
|
kw = PatternMatchResult(
|
|
pattern=self.pattern, pattern_id="CP-TEST-001",
|
|
method="keyword", confidence=0.95,
|
|
)
|
|
emb = PatternMatchResult(
|
|
pattern=self.pattern, pattern_id="CP-TEST-001",
|
|
method="embedding", confidence=0.98, embedding_score=0.98,
|
|
)
|
|
result = self.matcher._combine_results(kw, emb)
|
|
assert result.confidence <= 1.0
|
|
|
|
def test_different_patterns_picks_higher(self):
|
|
"""When tiers disagree, pick the higher confidence."""
|
|
p2 = ControlPattern(
|
|
id="CP-TEST-002", name="test2", name_de="Test2",
|
|
domain="SEC", category="test", description="d",
|
|
objective_template="o", rationale_template="r",
|
|
)
|
|
kw = PatternMatchResult(
|
|
pattern=self.pattern, pattern_id="CP-TEST-001",
|
|
method="keyword", confidence=0.6,
|
|
)
|
|
emb = PatternMatchResult(
|
|
pattern=p2, pattern_id="CP-TEST-002",
|
|
method="embedding", confidence=0.9, embedding_score=0.9,
|
|
)
|
|
result = self.matcher._combine_results(kw, emb)
|
|
assert result.pattern_id == "CP-TEST-002"
|
|
assert result.confidence == 0.9
|
|
|
|
def test_different_patterns_keyword_wins(self):
|
|
p2 = ControlPattern(
|
|
id="CP-TEST-002", name="test2", name_de="Test2",
|
|
domain="SEC", category="test", description="d",
|
|
objective_template="o", rationale_template="r",
|
|
)
|
|
kw = PatternMatchResult(
|
|
pattern=self.pattern, pattern_id="CP-TEST-001",
|
|
method="keyword", confidence=0.9,
|
|
)
|
|
emb = PatternMatchResult(
|
|
pattern=p2, pattern_id="CP-TEST-002",
|
|
method="embedding", confidence=0.6, embedding_score=0.6,
|
|
)
|
|
result = self.matcher._combine_results(kw, emb)
|
|
assert result.pattern_id == "CP-TEST-001"
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: PatternMatcher — Domain Affinity
|
|
# =============================================================================
|
|
|
|
|
|
class TestDomainAffinity:
|
|
"""Tests for regulation-to-domain affinity mapping."""
|
|
|
|
def test_dsgvo_affine_with_data(self):
|
|
assert PatternMatcher._domain_matches("DATA", "dsgvo")
|
|
|
|
def test_dsgvo_affine_with_comp(self):
|
|
assert PatternMatcher._domain_matches("COMP", "dsgvo")
|
|
|
|
def test_ai_act_affine_with_ai(self):
|
|
assert PatternMatcher._domain_matches("AI", "ai_act")
|
|
|
|
def test_nis2_affine_with_sec(self):
|
|
assert PatternMatcher._domain_matches("SEC", "nis2")
|
|
|
|
def test_nis2_affine_with_inc(self):
|
|
assert PatternMatcher._domain_matches("INC", "nis2")
|
|
|
|
def test_dora_affine_with_fin(self):
|
|
assert PatternMatcher._domain_matches("FIN", "dora")
|
|
|
|
def test_no_affinity_auth_dsgvo(self):
|
|
"""AUTH is not in DSGVO's affinity list."""
|
|
assert not PatternMatcher._domain_matches("AUTH", "dsgvo")
|
|
|
|
def test_unknown_regulation(self):
|
|
assert not PatternMatcher._domain_matches("DATA", "unknown_reg")
|
|
|
|
def test_all_regulations_have_affinity(self):
|
|
"""All 9 regulations should have at least one affine domain."""
|
|
expected_regs = [
|
|
"dsgvo", "bdsg", "ttdsg", "ai_act", "nis2",
|
|
"dsa", "data_act", "eu_machinery", "dora",
|
|
]
|
|
for reg in expected_regs:
|
|
assert reg in _REGULATION_DOMAIN_AFFINITY, f"{reg} missing from affinity map"
|
|
assert len(_REGULATION_DOMAIN_AFFINITY[reg]) >= 1
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: PatternMatcher — Full match()
|
|
# =============================================================================
|
|
|
|
|
|
class TestMatchFull:
|
|
"""Tests for the full match() method."""
|
|
|
|
def setup_method(self):
|
|
self.matcher = PatternMatcher()
|
|
self.matcher._load_patterns()
|
|
self.matcher._build_keyword_index()
|
|
self.matcher._initialized = True
|
|
# Empty embeddings — Tier 2 returns None
|
|
self.matcher._pattern_embeddings = []
|
|
self.matcher._pattern_ids = []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_match_password_text(self):
|
|
"""Password text should match CP-AUTH-001 via keywords."""
|
|
with patch(
|
|
"compliance.services.pattern_matcher._get_embedding",
|
|
new_callable=AsyncMock,
|
|
return_value=[],
|
|
):
|
|
result = await self.matcher.match(
|
|
obligation_text=(
|
|
"Passwortrichtlinie muss sicherstellen dass Anmeldedaten "
|
|
"und credential geschuetzt sind und authentifizierung robust ist"
|
|
),
|
|
regulation_id="nis2",
|
|
)
|
|
assert result.pattern_id == "CP-AUTH-001"
|
|
assert result.confidence > 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_match_encryption_text(self):
|
|
with patch(
|
|
"compliance.services.pattern_matcher._get_embedding",
|
|
new_callable=AsyncMock,
|
|
return_value=[],
|
|
):
|
|
result = await self.matcher.match(
|
|
obligation_text=(
|
|
"Verschluesselung ruhender Daten muss mit AES-256 encryption "
|
|
"und schluesselmanagement kryptographie erfolgen"
|
|
),
|
|
)
|
|
assert result.pattern_id == "CP-CRYP-001"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_match_empty_text(self):
|
|
result = await self.matcher.match(obligation_text="")
|
|
assert result.method == "none"
|
|
assert result.confidence == 0.0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_match_no_patterns(self):
|
|
"""When no patterns loaded, should return empty result."""
|
|
matcher = PatternMatcher()
|
|
matcher._initialized = True
|
|
result = await matcher.match(obligation_text="test")
|
|
assert result.method == "none"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_match_composable_patterns(self):
|
|
"""Result should include composable_with references."""
|
|
with patch(
|
|
"compliance.services.pattern_matcher._get_embedding",
|
|
new_callable=AsyncMock,
|
|
return_value=[],
|
|
):
|
|
result = await self.matcher.match(
|
|
obligation_text=(
|
|
"Passwortrichtlinie muss sicherstellen dass Anmeldedaten "
|
|
"und credential geschuetzt sind und authentifizierung robust ist"
|
|
),
|
|
)
|
|
if result.pattern and result.pattern.composable_with:
|
|
assert len(result.composable_patterns) >= 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_match_with_domain_bonus(self):
|
|
"""DSGVO obligation with DATA keywords should get domain bonus."""
|
|
with patch(
|
|
"compliance.services.pattern_matcher._get_embedding",
|
|
new_callable=AsyncMock,
|
|
return_value=[],
|
|
):
|
|
result = await self.matcher.match(
|
|
obligation_text=(
|
|
"Personenbezogene Daten muessen durch Datenschutz und "
|
|
"datensicherheit geschuetzt werden mit datenminimierung "
|
|
"und speicherbegrenzung und loeschung"
|
|
),
|
|
regulation_id="dsgvo",
|
|
)
|
|
# Should match a DATA-domain pattern
|
|
if result.pattern and result.pattern.domain == "DATA":
|
|
assert result.domain_bonus_applied is True
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: PatternMatcher — match_top_n()
|
|
# =============================================================================
|
|
|
|
|
|
class TestMatchTopN:
|
|
"""Tests for top-N matching."""
|
|
|
|
def setup_method(self):
|
|
self.matcher = PatternMatcher()
|
|
self.matcher._load_patterns()
|
|
self.matcher._build_keyword_index()
|
|
self.matcher._initialized = True
|
|
self.matcher._pattern_embeddings = []
|
|
self.matcher._pattern_ids = []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_top_n_returns_list(self):
|
|
with patch(
|
|
"compliance.services.pattern_matcher._get_embedding",
|
|
new_callable=AsyncMock,
|
|
return_value=[],
|
|
):
|
|
results = await self.matcher.match_top_n(
|
|
obligation_text=(
|
|
"Passwortrichtlinie muss sicherstellen dass Anmeldedaten "
|
|
"und credential geschuetzt sind und authentifizierung robust ist"
|
|
),
|
|
n=3,
|
|
)
|
|
assert isinstance(results, list)
|
|
assert len(results) >= 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_top_n_sorted_by_confidence(self):
|
|
with patch(
|
|
"compliance.services.pattern_matcher._get_embedding",
|
|
new_callable=AsyncMock,
|
|
return_value=[],
|
|
):
|
|
results = await self.matcher.match_top_n(
|
|
obligation_text=(
|
|
"Verschluesselung und kryptographie und schluesselmanagement "
|
|
"und authentifizierung und password und zugriffskontrolle"
|
|
),
|
|
n=5,
|
|
)
|
|
if len(results) >= 2:
|
|
for i in range(len(results) - 1):
|
|
assert results[i].confidence >= results[i + 1].confidence
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_top_n_empty_text(self):
|
|
with patch(
|
|
"compliance.services.pattern_matcher._get_embedding",
|
|
new_callable=AsyncMock,
|
|
return_value=[],
|
|
):
|
|
results = await self.matcher.match_top_n(obligation_text="", n=3)
|
|
assert results == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_top_n_respects_limit(self):
|
|
with patch(
|
|
"compliance.services.pattern_matcher._get_embedding",
|
|
new_callable=AsyncMock,
|
|
return_value=[],
|
|
):
|
|
results = await self.matcher.match_top_n(
|
|
obligation_text=(
|
|
"Verschluesselung und kryptographie und schluesselmanagement "
|
|
"und authentifizierung und password und zugriffskontrolle"
|
|
),
|
|
n=2,
|
|
)
|
|
assert len(results) <= 2
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: PatternMatcher — Public Helpers
|
|
# =============================================================================
|
|
|
|
|
|
class TestPublicHelpers:
|
|
"""Tests for get_pattern, get_patterns_by_domain, stats."""
|
|
|
|
def setup_method(self):
|
|
self.matcher = PatternMatcher()
|
|
self.matcher._load_patterns()
|
|
self.matcher._build_keyword_index()
|
|
|
|
def test_get_pattern_existing(self):
|
|
p = self.matcher.get_pattern("CP-AUTH-001")
|
|
assert p is not None
|
|
assert p.id == "CP-AUTH-001"
|
|
|
|
def test_get_pattern_case_insensitive(self):
|
|
p = self.matcher.get_pattern("cp-auth-001")
|
|
assert p is not None
|
|
|
|
def test_get_pattern_nonexistent(self):
|
|
p = self.matcher.get_pattern("CP-FAKE-999")
|
|
assert p is None
|
|
|
|
def test_get_patterns_by_domain(self):
|
|
patterns = self.matcher.get_patterns_by_domain("AUTH")
|
|
assert len(patterns) >= 3
|
|
|
|
def test_get_patterns_by_domain_case_insensitive(self):
|
|
patterns = self.matcher.get_patterns_by_domain("auth")
|
|
assert len(patterns) >= 3
|
|
|
|
def test_get_patterns_by_domain_unknown(self):
|
|
patterns = self.matcher.get_patterns_by_domain("NOPE")
|
|
assert patterns == []
|
|
|
|
def test_stats(self):
|
|
stats = self.matcher.stats()
|
|
assert stats["total_patterns"] == 50
|
|
assert len(stats["domains"]) >= 5
|
|
assert stats["keywords"] > 50
|
|
assert stats["initialized"] is False
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: PatternMatcher — auto initialize
|
|
# =============================================================================
|
|
|
|
|
|
class TestAutoInitialize:
|
|
"""Tests for auto-initialization on first match call."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_auto_init_on_match(self):
|
|
matcher = PatternMatcher()
|
|
assert not matcher._initialized
|
|
|
|
with patch.object(
|
|
matcher, "initialize", new_callable=AsyncMock
|
|
) as mock_init:
|
|
async def side_effect():
|
|
matcher._initialized = True
|
|
matcher._load_patterns()
|
|
matcher._build_keyword_index()
|
|
matcher._pattern_embeddings = []
|
|
matcher._pattern_ids = []
|
|
|
|
mock_init.side_effect = side_effect
|
|
|
|
with patch(
|
|
"compliance.services.pattern_matcher._get_embedding",
|
|
new_callable=AsyncMock,
|
|
return_value=[],
|
|
):
|
|
await matcher.match(obligation_text="test text")
|
|
|
|
mock_init.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_double_init(self):
|
|
matcher = PatternMatcher()
|
|
matcher._initialized = True
|
|
matcher._patterns = []
|
|
|
|
with patch.object(
|
|
matcher, "initialize", new_callable=AsyncMock
|
|
) as mock_init:
|
|
await matcher.match(obligation_text="test text")
|
|
mock_init.assert_not_called()
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: Constants
|
|
# =============================================================================
|
|
|
|
|
|
class TestConstants:
|
|
"""Tests for module-level constants."""
|
|
|
|
def test_keyword_min_hits(self):
|
|
assert KEYWORD_MATCH_MIN_HITS >= 1
|
|
|
|
def test_embedding_threshold_range(self):
|
|
assert 0 < EMBEDDING_PATTERN_THRESHOLD <= 1.0
|
|
|
|
def test_domain_bonus_range(self):
|
|
assert 0 < DOMAIN_BONUS <= 0.20
|
|
|
|
def test_domain_bonus_is_010(self):
|
|
assert DOMAIN_BONUS == 0.10
|
|
|
|
def test_embedding_threshold_is_075(self):
|
|
assert EMBEDDING_PATTERN_THRESHOLD == 0.75
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: Integration — Real keyword matching scenarios
|
|
# =============================================================================
|
|
|
|
|
|
class TestRealKeywordScenarios:
|
|
"""Integration tests with realistic obligation texts."""
|
|
|
|
def setup_method(self):
|
|
self.matcher = PatternMatcher()
|
|
self.matcher._load_patterns()
|
|
self.matcher._build_keyword_index()
|
|
|
|
def test_dsgvo_consent_obligation(self):
|
|
"""DSGVO consent obligation should match data protection patterns."""
|
|
scores = self.matcher._keyword_scores(
|
|
"Die Einwilligung der betroffenen Person muss freiwillig und "
|
|
"informiert erfolgen. Eine Verarbeitung personenbezogener Daten "
|
|
"ist nur mit gültiger Einwilligung zulaessig. Datenschutz.",
|
|
"dsgvo",
|
|
)
|
|
# Should have matches in DATA domain patterns
|
|
data_matches = [pid for pid in scores if pid.startswith("CP-DATA")]
|
|
assert len(data_matches) >= 1
|
|
|
|
def test_ai_act_risk_assessment(self):
|
|
"""AI Act risk assessment should match AI patterns."""
|
|
scores = self.matcher._keyword_scores(
|
|
"KI-Systeme mit hohem Risiko muessen einer Konformitaetsbewertung "
|
|
"unterzogen werden. Transparenz und Erklaerbarkeit sind Pflicht.",
|
|
"ai_act",
|
|
)
|
|
ai_matches = [pid for pid in scores if pid.startswith("CP-AI")]
|
|
assert len(ai_matches) >= 1
|
|
|
|
def test_nis2_incident_response(self):
|
|
"""NIS2 incident text should match INC patterns."""
|
|
scores = self.matcher._keyword_scores(
|
|
"Sicherheitsvorfaelle muessen innerhalb von 24 Stunden gemeldet "
|
|
"werden. Ein incident response plan und Eskalationsverfahren "
|
|
"sind zu etablieren fuer Vorfall und Wiederherstellung.",
|
|
"nis2",
|
|
)
|
|
inc_matches = [pid for pid in scores if pid.startswith("CP-INC")]
|
|
assert len(inc_matches) >= 1
|
|
|
|
def test_audit_logging_obligation(self):
|
|
"""Audit logging obligation should match LOG patterns."""
|
|
scores = self.matcher._keyword_scores(
|
|
"Alle sicherheitsrelevanten Ereignisse muessen protokolliert werden. "
|
|
"Audit-Trail und Monitoring der Zugriffe sind Pflicht. "
|
|
"Protokollierung muss manipulationssicher sein.",
|
|
None,
|
|
)
|
|
log_matches = [pid for pid in scores if pid.startswith("CP-LOG")]
|
|
assert len(log_matches) >= 1
|
|
|
|
def test_access_control_obligation(self):
|
|
"""Access control text should match ACC patterns."""
|
|
scores = self.matcher._keyword_scores(
|
|
"Zugriffskontrolle nach dem Least-Privilege-Prinzip. "
|
|
"Rollenbasierte Autorisierung und Berechtigung fuer alle Systeme.",
|
|
None,
|
|
)
|
|
acc_matches = [pid for pid in scores if pid.startswith("CP-ACC")]
|
|
assert len(acc_matches) >= 1
|