Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Failing after 47s
CI/CD / test-python-backend-compliance (push) Successful in 33s
CI/CD / test-python-document-crawler (push) Successful in 24s
CI/CD / test-python-dsms-gateway (push) Successful in 18s
CI/CD / validate-canonical-controls (push) Successful in 11s
CI/CD / Deploy (push) Has been skipped
Implements the full Multi-Layer Control Architecture for migrating ~25,000 Rich Controls into atomic, deduplicated Master Controls with full traceability. Architecture: Legal Source → Obligation → Control Pattern → Master Control → Customer Instance New services: - ObligationExtractor: 3-tier extraction (exact → embedding → LLM) - PatternMatcher: 2-tier matching (keyword + embedding + domain-bonus) - ControlComposer: Pattern + Obligation → Master Control - PipelineAdapter: Pipeline integration + Migration Passes 1-5 - DecompositionPass: Pass 0a/0b — Rich Control → atomic Controls - CrosswalkRoutes: 15 API endpoints under /v1/canonical/ New DB schema: - Migration 060: obligation_extractions, control_patterns, crosswalk_matrix - Migration 061: obligation_candidates, parent_control_uuid tracking Pattern Library: 50 YAML patterns (30 core + 20 IT-security) Go SDK: Pattern loader with YAML validation and indexing Documentation: MkDocs updated with full architecture overview 500 Python tests passing across all components. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
891 lines
30 KiB
Python
891 lines
30 KiB
Python
"""Tests for Control Composer — Phase 6 of Multi-Layer Control Architecture.
|
|
|
|
Validates:
|
|
- ComposedControl dataclass and serialization
|
|
- Pattern-guided composition (Tier 1)
|
|
- Template-only fallback (when LLM fails)
|
|
- Fallback composition (no pattern)
|
|
- License rule handling (Rules 1, 2, 3)
|
|
- Prompt building
|
|
- Field validation and fixing
|
|
- Batch composition
|
|
- Edge cases: empty inputs, missing data, malformed LLM responses
|
|
"""
|
|
|
|
import json
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
import pytest
|
|
|
|
from compliance.services.control_composer import (
|
|
ComposedControl,
|
|
ControlComposer,
|
|
_anchors_from_pattern,
|
|
_build_compose_prompt,
|
|
_build_fallback_prompt,
|
|
_compose_system_prompt,
|
|
_ensure_list,
|
|
_obligation_section,
|
|
_pattern_section,
|
|
_severity_to_risk,
|
|
_validate_control,
|
|
)
|
|
from compliance.services.obligation_extractor import ObligationMatch
|
|
from compliance.services.pattern_matcher import ControlPattern, PatternMatchResult
|
|
|
|
|
|
# =============================================================================
|
|
# Fixtures
|
|
# =============================================================================
|
|
|
|
|
|
def _make_obligation(
|
|
obligation_id="DSGVO-OBL-001",
|
|
title="Verarbeitungsverzeichnis fuehren",
|
|
text="Fuehrung eines Verzeichnisses aller Verarbeitungstaetigkeiten.",
|
|
method="exact_match",
|
|
confidence=1.0,
|
|
regulation_id="dsgvo",
|
|
) -> ObligationMatch:
|
|
return ObligationMatch(
|
|
obligation_id=obligation_id,
|
|
obligation_title=title,
|
|
obligation_text=text,
|
|
method=method,
|
|
confidence=confidence,
|
|
regulation_id=regulation_id,
|
|
)
|
|
|
|
|
|
def _make_pattern(
|
|
pattern_id="CP-COMP-001",
|
|
name="compliance_governance",
|
|
name_de="Compliance-Governance",
|
|
domain="COMP",
|
|
category="compliance",
|
|
) -> ControlPattern:
|
|
return ControlPattern(
|
|
id=pattern_id,
|
|
name=name,
|
|
name_de=name_de,
|
|
domain=domain,
|
|
category=category,
|
|
description="Compliance management and governance framework",
|
|
objective_template="Sicherstellen, dass ein wirksames Compliance-Management existiert.",
|
|
rationale_template="Ohne Governance fehlt die Grundlage fuer Compliance.",
|
|
requirements_template=[
|
|
"Compliance-Verantwortlichkeiten definieren",
|
|
"Regelmaessige Compliance-Bewertungen durchfuehren",
|
|
"Dokumentationspflichten einhalten",
|
|
],
|
|
test_procedure_template=[
|
|
"Pruefung der Compliance-Organisation",
|
|
"Stichproben der Dokumentation",
|
|
],
|
|
evidence_template=[
|
|
"Compliance-Handbuch",
|
|
"Pruefberichte",
|
|
],
|
|
severity_default="high",
|
|
implementation_effort_default="l",
|
|
obligation_match_keywords=["compliance", "governance", "konformitaet"],
|
|
tags=["compliance", "governance"],
|
|
composable_with=["CP-COMP-002"],
|
|
open_anchor_refs=[
|
|
{"framework": "ISO 27001", "ref": "A.18"},
|
|
{"framework": "NIST CSF", "ref": "GV.OC"},
|
|
],
|
|
)
|
|
|
|
|
|
def _make_pattern_result(pattern=None, confidence=0.85, method="keyword") -> PatternMatchResult:
|
|
if pattern is None:
|
|
pattern = _make_pattern()
|
|
return PatternMatchResult(
|
|
pattern=pattern,
|
|
pattern_id=pattern.id,
|
|
method=method,
|
|
confidence=confidence,
|
|
keyword_hits=4,
|
|
total_keywords=7,
|
|
)
|
|
|
|
|
|
def _llm_success_response() -> str:
|
|
return json.dumps({
|
|
"title": "Compliance-Governance fuer Verarbeitungstaetigkeiten",
|
|
"objective": "Sicherstellen, dass alle Verarbeitungstaetigkeiten dokumentiert und ueberwacht werden.",
|
|
"rationale": "Die DSGVO verlangt ein Verarbeitungsverzeichnis als Grundlage der Rechenschaftspflicht.",
|
|
"requirements": [
|
|
"Verarbeitungsverzeichnis gemaess Art. 30 DSGVO fuehren",
|
|
"Regelmaessige Aktualisierung bei Aenderungen",
|
|
"Verantwortlichkeiten fuer die Pflege zuweisen",
|
|
],
|
|
"test_procedure": [
|
|
"Vollstaendigkeit des Verzeichnisses pruefen",
|
|
"Aktualitaet der Eintraege verifizieren",
|
|
],
|
|
"evidence": [
|
|
"Verarbeitungsverzeichnis",
|
|
"Aenderungsprotokoll",
|
|
],
|
|
"severity": "high",
|
|
"implementation_effort": "m",
|
|
"category": "compliance",
|
|
"tags": ["dsgvo", "verarbeitungsverzeichnis", "governance"],
|
|
"target_audience": ["unternehmen", "behoerden"],
|
|
"verification_method": "document",
|
|
})
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: ComposedControl
|
|
# =============================================================================
|
|
|
|
|
|
class TestComposedControl:
|
|
"""Tests for the ComposedControl dataclass."""
|
|
|
|
def test_defaults(self):
|
|
c = ComposedControl()
|
|
assert c.control_id == ""
|
|
assert c.title == ""
|
|
assert c.severity == "medium"
|
|
assert c.risk_score == 5.0
|
|
assert c.implementation_effort == "m"
|
|
assert c.release_state == "draft"
|
|
assert c.license_rule is None
|
|
assert c.customer_visible is True
|
|
assert c.pattern_id is None
|
|
assert c.obligation_ids == []
|
|
assert c.composition_method == "pattern_guided"
|
|
|
|
def test_to_dict_keys(self):
|
|
c = ComposedControl()
|
|
d = c.to_dict()
|
|
expected_keys = {
|
|
"control_id", "title", "objective", "rationale", "scope",
|
|
"requirements", "test_procedure", "evidence", "severity",
|
|
"risk_score", "implementation_effort", "open_anchors",
|
|
"release_state", "tags", "license_rule", "source_original_text",
|
|
"source_citation", "customer_visible", "verification_method",
|
|
"category", "target_audience", "pattern_id", "obligation_ids",
|
|
"generation_metadata", "composition_method",
|
|
}
|
|
assert set(d.keys()) == expected_keys
|
|
|
|
def test_to_dict_values(self):
|
|
c = ComposedControl(
|
|
title="Test Control",
|
|
pattern_id="CP-AUTH-001",
|
|
obligation_ids=["DSGVO-OBL-001"],
|
|
severity="high",
|
|
license_rule=1,
|
|
)
|
|
d = c.to_dict()
|
|
assert d["title"] == "Test Control"
|
|
assert d["pattern_id"] == "CP-AUTH-001"
|
|
assert d["obligation_ids"] == ["DSGVO-OBL-001"]
|
|
assert d["severity"] == "high"
|
|
assert d["license_rule"] == 1
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: _ensure_list
|
|
# =============================================================================
|
|
|
|
|
|
class TestEnsureList:
|
|
def test_list_passthrough(self):
|
|
assert _ensure_list(["a", "b"]) == ["a", "b"]
|
|
|
|
def test_string_to_list(self):
|
|
assert _ensure_list("hello") == ["hello"]
|
|
|
|
def test_none_to_empty(self):
|
|
assert _ensure_list(None) == []
|
|
|
|
def test_empty_list(self):
|
|
assert _ensure_list([]) == []
|
|
|
|
def test_filters_empty_values(self):
|
|
assert _ensure_list(["a", "", "b"]) == ["a", "b"]
|
|
|
|
def test_converts_to_strings(self):
|
|
assert _ensure_list([1, 2, 3]) == ["1", "2", "3"]
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: _anchors_from_pattern
|
|
# =============================================================================
|
|
|
|
|
|
class TestAnchorsFromPattern:
|
|
def test_converts_anchors(self):
|
|
pattern = _make_pattern()
|
|
anchors = _anchors_from_pattern(pattern)
|
|
assert len(anchors) == 2
|
|
assert anchors[0]["framework"] == "ISO 27001"
|
|
assert anchors[0]["control_id"] == "A.18"
|
|
assert anchors[0]["alignment_score"] == 0.8
|
|
|
|
def test_empty_anchors(self):
|
|
pattern = _make_pattern()
|
|
pattern.open_anchor_refs = []
|
|
anchors = _anchors_from_pattern(pattern)
|
|
assert anchors == []
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: _severity_to_risk
|
|
# =============================================================================
|
|
|
|
|
|
class TestSeverityToRisk:
|
|
def test_critical(self):
|
|
assert _severity_to_risk("critical") == 9.0
|
|
|
|
def test_high(self):
|
|
assert _severity_to_risk("high") == 7.0
|
|
|
|
def test_medium(self):
|
|
assert _severity_to_risk("medium") == 5.0
|
|
|
|
def test_low(self):
|
|
assert _severity_to_risk("low") == 3.0
|
|
|
|
def test_unknown(self):
|
|
assert _severity_to_risk("xyz") == 5.0
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: _validate_control
|
|
# =============================================================================
|
|
|
|
|
|
class TestValidateControl:
|
|
def test_fixes_invalid_severity(self):
|
|
c = ComposedControl(severity="extreme")
|
|
_validate_control(c)
|
|
assert c.severity == "medium"
|
|
|
|
def test_keeps_valid_severity(self):
|
|
c = ComposedControl(severity="critical")
|
|
_validate_control(c)
|
|
assert c.severity == "critical"
|
|
|
|
def test_fixes_invalid_effort(self):
|
|
c = ComposedControl(implementation_effort="xxl")
|
|
_validate_control(c)
|
|
assert c.implementation_effort == "m"
|
|
|
|
def test_fixes_invalid_verification(self):
|
|
c = ComposedControl(verification_method="magic")
|
|
_validate_control(c)
|
|
assert c.verification_method is None
|
|
|
|
def test_keeps_valid_verification(self):
|
|
c = ComposedControl(verification_method="code_review")
|
|
_validate_control(c)
|
|
assert c.verification_method == "code_review"
|
|
|
|
def test_fixes_risk_score_out_of_range(self):
|
|
c = ComposedControl(risk_score=15.0, severity="high")
|
|
_validate_control(c)
|
|
assert c.risk_score == 7.0 # from severity
|
|
|
|
def test_truncates_long_title(self):
|
|
c = ComposedControl(title="A" * 300)
|
|
_validate_control(c)
|
|
assert len(c.title) <= 255
|
|
|
|
def test_ensures_minimum_content(self):
|
|
c = ComposedControl(
|
|
title="Test",
|
|
objective="",
|
|
rationale="",
|
|
requirements=[],
|
|
test_procedure=[],
|
|
evidence=[],
|
|
)
|
|
_validate_control(c)
|
|
assert c.objective == "Test" # falls back to title
|
|
assert c.rationale != ""
|
|
assert len(c.requirements) >= 1
|
|
assert len(c.test_procedure) >= 1
|
|
assert len(c.evidence) >= 1
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: Prompt builders
|
|
# =============================================================================
|
|
|
|
|
|
class TestPromptBuilders:
|
|
def test_compose_system_prompt_rule1(self):
|
|
prompt = _compose_system_prompt(1)
|
|
assert "praxisorientiertes" in prompt
|
|
assert "KOPIERE KEINE" not in prompt
|
|
|
|
def test_compose_system_prompt_rule3(self):
|
|
prompt = _compose_system_prompt(3)
|
|
assert "KOPIERE KEINE" in prompt
|
|
assert "NENNE NICHT die Quelle" in prompt
|
|
|
|
def test_obligation_section_full(self):
|
|
obl = _make_obligation()
|
|
section = _obligation_section(obl)
|
|
assert "PFLICHT" in section
|
|
assert "Verarbeitungsverzeichnis" in section
|
|
assert "DSGVO-OBL-001" in section
|
|
assert "dsgvo" in section
|
|
|
|
def test_obligation_section_minimal(self):
|
|
obl = ObligationMatch()
|
|
section = _obligation_section(obl)
|
|
assert "Keine spezifische Pflicht" in section
|
|
|
|
def test_pattern_section(self):
|
|
pattern = _make_pattern()
|
|
section = _pattern_section(pattern)
|
|
assert "MUSTER" in section
|
|
assert "Compliance-Governance" in section
|
|
assert "CP-COMP-001" in section
|
|
assert "Compliance-Verantwortlichkeiten" in section
|
|
|
|
def test_build_compose_prompt_rule1(self):
|
|
obl = _make_obligation()
|
|
pattern = _make_pattern()
|
|
prompt = _build_compose_prompt(obl, pattern, "Original text here", 1)
|
|
assert "PFLICHT" in prompt
|
|
assert "MUSTER" in prompt
|
|
assert "KONTEXT (Originaltext)" in prompt
|
|
assert "Original text here" in prompt
|
|
|
|
def test_build_compose_prompt_rule3(self):
|
|
obl = _make_obligation()
|
|
pattern = _make_pattern()
|
|
prompt = _build_compose_prompt(obl, pattern, "Secret text", 3)
|
|
assert "Intern analysiert" in prompt
|
|
assert "Secret text" not in prompt
|
|
|
|
def test_build_fallback_prompt(self):
|
|
obl = _make_obligation()
|
|
prompt = _build_fallback_prompt(obl, "Chunk text", 1)
|
|
assert "PFLICHT" in prompt
|
|
assert "KONTEXT (Originaltext)" in prompt
|
|
|
|
def test_build_fallback_prompt_no_chunk(self):
|
|
obl = _make_obligation()
|
|
prompt = _build_fallback_prompt(obl, None, 1)
|
|
assert "Kein Originaltext" in prompt
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: ControlComposer — Pattern-guided composition
|
|
# =============================================================================
|
|
|
|
|
|
class TestComposeWithPattern:
|
|
"""Tests for pattern-guided control composition."""
|
|
|
|
def setup_method(self):
|
|
self.composer = ControlComposer()
|
|
self.obligation = _make_obligation()
|
|
self.pattern_result = _make_pattern_result()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_compose_success_rule1(self):
|
|
"""Successful LLM composition with Rule 1."""
|
|
with patch(
|
|
"compliance.services.control_composer._llm_ollama",
|
|
new_callable=AsyncMock,
|
|
return_value=_llm_success_response(),
|
|
):
|
|
control = await self.composer.compose(
|
|
obligation=self.obligation,
|
|
pattern_result=self.pattern_result,
|
|
chunk_text="Der Verantwortliche fuehrt ein Verzeichnis...",
|
|
license_rule=1,
|
|
)
|
|
|
|
assert control.composition_method == "pattern_guided"
|
|
assert control.title != ""
|
|
assert "Verarbeitungstaetigkeiten" in control.objective
|
|
assert len(control.requirements) >= 2
|
|
assert len(control.test_procedure) >= 1
|
|
assert len(control.evidence) >= 1
|
|
assert control.severity == "high"
|
|
assert control.category == "compliance"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_compose_sets_linkage(self):
|
|
"""Pattern and obligation IDs should be set."""
|
|
with patch(
|
|
"compliance.services.control_composer._llm_ollama",
|
|
new_callable=AsyncMock,
|
|
return_value=_llm_success_response(),
|
|
):
|
|
control = await self.composer.compose(
|
|
obligation=self.obligation,
|
|
pattern_result=self.pattern_result,
|
|
license_rule=1,
|
|
)
|
|
|
|
assert control.pattern_id == "CP-COMP-001"
|
|
assert control.obligation_ids == ["DSGVO-OBL-001"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_compose_sets_metadata(self):
|
|
"""Generation metadata should include composition details."""
|
|
with patch(
|
|
"compliance.services.control_composer._llm_ollama",
|
|
new_callable=AsyncMock,
|
|
return_value=_llm_success_response(),
|
|
):
|
|
control = await self.composer.compose(
|
|
obligation=self.obligation,
|
|
pattern_result=self.pattern_result,
|
|
license_rule=1,
|
|
regulation_code="eu_2016_679",
|
|
)
|
|
|
|
meta = control.generation_metadata
|
|
assert meta["composition_method"] == "pattern_guided"
|
|
assert meta["pattern_id"] == "CP-COMP-001"
|
|
assert meta["pattern_confidence"] == 0.85
|
|
assert meta["obligation_id"] == "DSGVO-OBL-001"
|
|
assert meta["license_rule"] == 1
|
|
assert meta["regulation_code"] == "eu_2016_679"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_compose_rule1_stores_original(self):
|
|
"""Rule 1: original text should be stored."""
|
|
with patch(
|
|
"compliance.services.control_composer._llm_ollama",
|
|
new_callable=AsyncMock,
|
|
return_value=_llm_success_response(),
|
|
):
|
|
control = await self.composer.compose(
|
|
obligation=self.obligation,
|
|
pattern_result=self.pattern_result,
|
|
chunk_text="Original DSGVO text",
|
|
license_rule=1,
|
|
)
|
|
|
|
assert control.license_rule == 1
|
|
assert control.source_original_text == "Original DSGVO text"
|
|
assert control.customer_visible is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_compose_rule2_stores_citation(self):
|
|
"""Rule 2: citation should be stored."""
|
|
citation = {
|
|
"source": "OWASP ASVS",
|
|
"license": "CC-BY-SA-4.0",
|
|
"license_notice": "OWASP Foundation",
|
|
}
|
|
with patch(
|
|
"compliance.services.control_composer._llm_ollama",
|
|
new_callable=AsyncMock,
|
|
return_value=_llm_success_response(),
|
|
):
|
|
control = await self.composer.compose(
|
|
obligation=self.obligation,
|
|
pattern_result=self.pattern_result,
|
|
chunk_text="OWASP text",
|
|
license_rule=2,
|
|
source_citation=citation,
|
|
)
|
|
|
|
assert control.license_rule == 2
|
|
assert control.source_original_text == "OWASP text"
|
|
assert control.source_citation == citation
|
|
assert control.customer_visible is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_compose_rule3_no_original(self):
|
|
"""Rule 3: no original text, not customer visible."""
|
|
with patch(
|
|
"compliance.services.control_composer._llm_ollama",
|
|
new_callable=AsyncMock,
|
|
return_value=_llm_success_response(),
|
|
):
|
|
control = await self.composer.compose(
|
|
obligation=self.obligation,
|
|
pattern_result=self.pattern_result,
|
|
chunk_text="BSI restricted text",
|
|
license_rule=3,
|
|
)
|
|
|
|
assert control.license_rule == 3
|
|
assert control.source_original_text is None
|
|
assert control.source_citation is None
|
|
assert control.customer_visible is False
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: ControlComposer — Template-only fallback (LLM fails)
|
|
# =============================================================================
|
|
|
|
|
|
class TestTemplateOnlyFallback:
|
|
"""Tests for template-only composition when LLM fails."""
|
|
|
|
def setup_method(self):
|
|
self.composer = ControlComposer()
|
|
self.obligation = _make_obligation()
|
|
self.pattern_result = _make_pattern_result()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_template_fallback_on_empty_llm(self):
|
|
"""When LLM returns empty, should use template directly."""
|
|
with patch(
|
|
"compliance.services.control_composer._llm_ollama",
|
|
new_callable=AsyncMock,
|
|
return_value="",
|
|
):
|
|
control = await self.composer.compose(
|
|
obligation=self.obligation,
|
|
pattern_result=self.pattern_result,
|
|
license_rule=1,
|
|
)
|
|
|
|
assert control.composition_method == "template_only"
|
|
assert "Compliance-Governance" in control.title
|
|
assert control.severity == "high" # from pattern
|
|
assert len(control.requirements) >= 2 # from pattern template
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_template_fallback_on_invalid_json(self):
|
|
"""When LLM returns non-JSON, should use template."""
|
|
with patch(
|
|
"compliance.services.control_composer._llm_ollama",
|
|
new_callable=AsyncMock,
|
|
return_value="This is not JSON at all",
|
|
):
|
|
control = await self.composer.compose(
|
|
obligation=self.obligation,
|
|
pattern_result=self.pattern_result,
|
|
license_rule=1,
|
|
)
|
|
|
|
assert control.composition_method == "template_only"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_template_includes_obligation_title(self):
|
|
"""Template fallback should include obligation title in control title."""
|
|
with patch(
|
|
"compliance.services.control_composer._llm_ollama",
|
|
new_callable=AsyncMock,
|
|
return_value="",
|
|
):
|
|
control = await self.composer.compose(
|
|
obligation=self.obligation,
|
|
pattern_result=self.pattern_result,
|
|
license_rule=1,
|
|
)
|
|
|
|
assert "Verarbeitungsverzeichnis" in control.title
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_template_has_open_anchors(self):
|
|
"""Template fallback should include pattern anchors."""
|
|
with patch(
|
|
"compliance.services.control_composer._llm_ollama",
|
|
new_callable=AsyncMock,
|
|
return_value="",
|
|
):
|
|
control = await self.composer.compose(
|
|
obligation=self.obligation,
|
|
pattern_result=self.pattern_result,
|
|
license_rule=1,
|
|
)
|
|
|
|
assert len(control.open_anchors) == 2
|
|
frameworks = [a["framework"] for a in control.open_anchors]
|
|
assert "ISO 27001" in frameworks
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: ControlComposer — Fallback (no pattern)
|
|
# =============================================================================
|
|
|
|
|
|
class TestFallbackNoPattern:
|
|
"""Tests for fallback composition without a pattern."""
|
|
|
|
def setup_method(self):
|
|
self.composer = ControlComposer()
|
|
self.obligation = _make_obligation()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fallback_with_llm(self):
|
|
"""Fallback should work with LLM response."""
|
|
response = json.dumps({
|
|
"title": "Verarbeitungsverzeichnis",
|
|
"objective": "Verzeichnis fuehren",
|
|
"rationale": "DSGVO Art. 30",
|
|
"requirements": ["VVT anlegen"],
|
|
"test_procedure": ["VVT pruefen"],
|
|
"evidence": ["VVT Dokument"],
|
|
"severity": "high",
|
|
})
|
|
with patch(
|
|
"compliance.services.control_composer._llm_ollama",
|
|
new_callable=AsyncMock,
|
|
return_value=response,
|
|
):
|
|
control = await self.composer.compose(
|
|
obligation=self.obligation,
|
|
pattern_result=PatternMatchResult(), # No pattern
|
|
license_rule=1,
|
|
)
|
|
|
|
assert control.composition_method == "fallback"
|
|
assert control.pattern_id is None
|
|
assert control.release_state == "needs_review"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fallback_llm_fails(self):
|
|
"""Fallback with LLM failure should still produce a control."""
|
|
with patch(
|
|
"compliance.services.control_composer._llm_ollama",
|
|
new_callable=AsyncMock,
|
|
return_value="",
|
|
):
|
|
control = await self.composer.compose(
|
|
obligation=self.obligation,
|
|
pattern_result=PatternMatchResult(),
|
|
license_rule=1,
|
|
)
|
|
|
|
assert control.composition_method == "fallback"
|
|
assert control.title != ""
|
|
# Validation ensures minimum content
|
|
assert len(control.requirements) >= 1
|
|
assert len(control.test_procedure) >= 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fallback_no_obligation_text(self):
|
|
"""Fallback with empty obligation should still work."""
|
|
empty_obl = ObligationMatch()
|
|
with patch(
|
|
"compliance.services.control_composer._llm_ollama",
|
|
new_callable=AsyncMock,
|
|
return_value="",
|
|
):
|
|
control = await self.composer.compose(
|
|
obligation=empty_obl,
|
|
pattern_result=PatternMatchResult(),
|
|
license_rule=3,
|
|
)
|
|
|
|
assert control.title != ""
|
|
assert control.customer_visible is False
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: ControlComposer — Batch composition
|
|
# =============================================================================
|
|
|
|
|
|
class TestComposeBatch:
|
|
"""Tests for batch composition."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_batch_returns_list(self):
|
|
composer = ControlComposer()
|
|
items = [
|
|
{
|
|
"obligation": _make_obligation(),
|
|
"pattern_result": _make_pattern_result(),
|
|
"license_rule": 1,
|
|
},
|
|
{
|
|
"obligation": _make_obligation(
|
|
obligation_id="NIS2-OBL-001",
|
|
title="Incident Meldepflicht",
|
|
regulation_id="nis2",
|
|
),
|
|
"pattern_result": PatternMatchResult(),
|
|
"license_rule": 3,
|
|
},
|
|
]
|
|
|
|
with patch(
|
|
"compliance.services.control_composer._llm_ollama",
|
|
new_callable=AsyncMock,
|
|
return_value=_llm_success_response(),
|
|
):
|
|
results = await composer.compose_batch(items)
|
|
|
|
assert len(results) == 2
|
|
assert results[0].pattern_id == "CP-COMP-001"
|
|
assert results[1].pattern_id is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_batch_empty(self):
|
|
composer = ControlComposer()
|
|
results = await composer.compose_batch([])
|
|
assert results == []
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: Validation integration
|
|
# =============================================================================
|
|
|
|
|
|
class TestValidationIntegration:
|
|
"""Tests that validation runs during compose."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_compose_validates_severity(self):
|
|
"""Invalid severity from LLM should be fixed."""
|
|
response = json.dumps({
|
|
"title": "Test",
|
|
"objective": "Test",
|
|
"severity": "EXTREME",
|
|
})
|
|
composer = ControlComposer()
|
|
with patch(
|
|
"compliance.services.control_composer._llm_ollama",
|
|
new_callable=AsyncMock,
|
|
return_value=response,
|
|
):
|
|
control = await composer.compose(
|
|
obligation=_make_obligation(),
|
|
pattern_result=_make_pattern_result(),
|
|
license_rule=1,
|
|
)
|
|
|
|
assert control.severity in {"low", "medium", "high", "critical"}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_compose_ensures_minimum_content(self):
|
|
"""Empty requirements from LLM should be filled with defaults."""
|
|
response = json.dumps({
|
|
"title": "Test",
|
|
"objective": "Test objective",
|
|
"requirements": [],
|
|
})
|
|
composer = ControlComposer()
|
|
with patch(
|
|
"compliance.services.control_composer._llm_ollama",
|
|
new_callable=AsyncMock,
|
|
return_value=response,
|
|
):
|
|
control = await composer.compose(
|
|
obligation=_make_obligation(),
|
|
pattern_result=_make_pattern_result(),
|
|
license_rule=1,
|
|
)
|
|
|
|
assert len(control.requirements) >= 1
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: License rule edge cases
|
|
# =============================================================================
|
|
|
|
|
|
class TestLicenseRuleEdgeCases:
|
|
"""Tests for license rule handling edge cases."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rule1_no_chunk_text(self):
|
|
"""Rule 1 without chunk text: original_text should be None."""
|
|
composer = ControlComposer()
|
|
with patch(
|
|
"compliance.services.control_composer._llm_ollama",
|
|
new_callable=AsyncMock,
|
|
return_value=_llm_success_response(),
|
|
):
|
|
control = await composer.compose(
|
|
obligation=_make_obligation(),
|
|
pattern_result=_make_pattern_result(),
|
|
chunk_text=None,
|
|
license_rule=1,
|
|
)
|
|
|
|
assert control.license_rule == 1
|
|
assert control.source_original_text is None
|
|
assert control.customer_visible is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rule2_no_citation(self):
|
|
"""Rule 2 without citation: citation should be None."""
|
|
composer = ControlComposer()
|
|
with patch(
|
|
"compliance.services.control_composer._llm_ollama",
|
|
new_callable=AsyncMock,
|
|
return_value=_llm_success_response(),
|
|
):
|
|
control = await composer.compose(
|
|
obligation=_make_obligation(),
|
|
pattern_result=_make_pattern_result(),
|
|
chunk_text="Some text",
|
|
license_rule=2,
|
|
source_citation=None,
|
|
)
|
|
|
|
assert control.license_rule == 2
|
|
assert control.source_citation is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rule3_overrides_chunk_and_citation(self):
|
|
"""Rule 3 should always clear original text and citation."""
|
|
composer = ControlComposer()
|
|
with patch(
|
|
"compliance.services.control_composer._llm_ollama",
|
|
new_callable=AsyncMock,
|
|
return_value=_llm_success_response(),
|
|
):
|
|
control = await composer.compose(
|
|
obligation=_make_obligation(),
|
|
pattern_result=_make_pattern_result(),
|
|
chunk_text="This should be cleared",
|
|
license_rule=3,
|
|
source_citation={"source": "BSI"},
|
|
)
|
|
|
|
assert control.source_original_text is None
|
|
assert control.source_citation is None
|
|
assert control.customer_visible is False
|
|
|
|
|
|
# =============================================================================
|
|
# Tests: Obligation without ID
|
|
# =============================================================================
|
|
|
|
|
|
class TestObligationWithoutId:
|
|
"""Tests for handling obligations without a known ID."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_llm_extracted_obligation(self):
|
|
"""LLM-extracted obligation (no ID) should still compose."""
|
|
obl = ObligationMatch(
|
|
obligation_id=None,
|
|
obligation_title=None,
|
|
obligation_text="Pflicht zur Meldung von Sicherheitsvorfaellen",
|
|
method="llm_extracted",
|
|
confidence=0.60,
|
|
regulation_id="nis2",
|
|
)
|
|
composer = ControlComposer()
|
|
with patch(
|
|
"compliance.services.control_composer._llm_ollama",
|
|
new_callable=AsyncMock,
|
|
return_value=_llm_success_response(),
|
|
):
|
|
control = await composer.compose(
|
|
obligation=obl,
|
|
pattern_result=_make_pattern_result(),
|
|
license_rule=1,
|
|
)
|
|
|
|
assert control.obligation_ids == [] # No ID to link
|
|
assert control.pattern_id == "CP-COMP-001"
|
|
assert control.generation_metadata["obligation_method"] == "llm_extracted"
|