feat: Deterministic Control Composition Engine v2 for Pass 0b
All checks were successful
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 38s
CI/CD / test-python-backend-compliance (push) Successful in 39s
CI/CD / test-python-document-crawler (push) Successful in 26s
CI/CD / test-python-dsms-gateway (push) Successful in 21s
CI/CD / validate-canonical-controls (push) Successful in 13s
CI/CD / Deploy (push) Successful in 3s
All checks were successful
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 38s
CI/CD / test-python-backend-compliance (push) Successful in 39s
CI/CD / test-python-document-crawler (push) Successful in 26s
CI/CD / test-python-dsms-gateway (push) Successful in 21s
CI/CD / validate-canonical-controls (push) Successful in 13s
CI/CD / Deploy (push) Successful in 3s
Replace LLM-based Pass 0b with rule-based deterministic engine that
composes atomic controls from obligation data without any LLM call.
Engine features:
- 24 action types (implement, configure, define, document, maintain,
review, monitor, assess, audit, test, verify, validate, report,
notify, train, restrict_access, encrypt, delete, retain, ensure,
approve, remediate, perform, obtain)
- 19 object classes (policy, procedure, register, record, report,
technical_control, access_control, cryptographic_control,
configuration, account, system, data, interface, role, training,
incident, risk_artifact, process, consent)
- Compound action splitting with no-split phrases
- Title pattern: "{Object} {state_suffix}" (24 state suffixes)
- Statement field: "{condition} {object} ist {trigger} {action}"
- Pattern candidates for downstream categorization (26 specific
combos + 24 action fallbacks)
- Structured timing: deadline_hours + frequency extraction
- Confidence scoring (0.3 base + action/object/trigger/template)
- Merge group hints for dedup: "{action}:{norm_obj}:{trigger}"
- Synonym-based object normalization (50+ German synonyms)
- 16 specific (action_type, object_class) template overrides
- Output validator with Pflichtfelder + Negativregeln + Warnregeln
- All new fields serialized into gen_meta JSONB (no migration needed)
Tests: 185 passed (33 new tests covering all engine components)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -9,7 +9,7 @@ Covers:
|
||||
- _parse_json_array / _parse_json_object
|
||||
- _format_field / _format_citation
|
||||
- _normalize_severity
|
||||
- _template_fallback
|
||||
- _compose_deterministic / _classify_action
|
||||
- _build_pass0a_prompt / _build_pass0b_prompt
|
||||
- DecompositionPass.run_pass0a (mocked LLM + DB)
|
||||
- DecompositionPass.run_pass0b (mocked LLM + DB)
|
||||
@@ -40,7 +40,11 @@ from compliance.services.decomposition_pass import (
|
||||
_format_citation,
|
||||
_compute_extraction_confidence,
|
||||
_normalize_severity,
|
||||
_template_fallback,
|
||||
_compose_deterministic,
|
||||
_classify_action,
|
||||
_classify_object,
|
||||
_split_compound_action,
|
||||
_extract_trigger_qualifier,
|
||||
_fallback_obligation,
|
||||
_build_pass0a_prompt,
|
||||
_build_pass0b_prompt,
|
||||
@@ -53,6 +57,11 @@ from compliance.services.decomposition_pass import (
|
||||
_is_implementation_specific_text,
|
||||
_text_similar,
|
||||
_is_more_implementation_specific,
|
||||
_extract_structured_timing,
|
||||
_normalize_object,
|
||||
_validate_atomic_control,
|
||||
_PATTERN_CANDIDATES_MAP,
|
||||
_PATTERN_CANDIDATES_BY_ACTION,
|
||||
)
|
||||
|
||||
|
||||
@@ -495,11 +504,98 @@ class TestNormalizeSeverity:
|
||||
assert _normalize_severity(None) == "medium"
|
||||
|
||||
|
||||
class TestTemplateFallback:
|
||||
"""Tests for _template_fallback."""
|
||||
class TestClassifyAction:
|
||||
"""Tests for _classify_action."""
|
||||
|
||||
def test_normal_obligation(self):
|
||||
ac = _template_fallback(
|
||||
def test_simple_document_action(self):
|
||||
assert _classify_action("dokumentieren") == "document"
|
||||
|
||||
def test_simple_implement_action(self):
|
||||
assert _classify_action("implementieren") == "implement"
|
||||
|
||||
def test_compound_action_picks_highest_priority(self):
|
||||
# "erstellen" → document, "implementieren" → implement
|
||||
# implement has higher priority
|
||||
assert _classify_action("erstellen und implementieren") == "implement"
|
||||
|
||||
def test_maintain_action(self):
|
||||
assert _classify_action("aktuell halten") == "maintain"
|
||||
assert _classify_action("pflegen") == "maintain"
|
||||
|
||||
def test_ensure_action(self):
|
||||
assert _classify_action("sicherstellen") == "ensure"
|
||||
assert _classify_action("gewährleisten") == "ensure"
|
||||
|
||||
def test_reporting_action(self):
|
||||
assert _classify_action("melden") == "report"
|
||||
assert _classify_action("informieren") == "notify"
|
||||
|
||||
def test_empty_action(self):
|
||||
assert _classify_action("") == "default"
|
||||
|
||||
def test_unknown_action(self):
|
||||
assert _classify_action("xyzzy") == "default"
|
||||
|
||||
def test_access_action(self):
|
||||
assert _classify_action("beschränken") == "restrict_access"
|
||||
assert _classify_action("autorisieren") == "restrict_access"
|
||||
|
||||
def test_encrypt_action(self):
|
||||
assert _classify_action("verschlüsseln") == "encrypt"
|
||||
|
||||
def test_english_fallback(self):
|
||||
assert _classify_action("implement") == "implement"
|
||||
assert _classify_action("monitor") == "monitor"
|
||||
|
||||
def test_aufbewahren(self):
|
||||
assert _classify_action("aufbewahren") == "retain"
|
||||
|
||||
def test_beifuegen(self):
|
||||
assert _classify_action("beifügen") == "document"
|
||||
|
||||
def test_angeben(self):
|
||||
assert _classify_action("angeben") == "document"
|
||||
|
||||
def test_review_vs_monitor(self):
|
||||
"""review and monitor are now separate types."""
|
||||
assert _classify_action("überprüfen") == "review"
|
||||
assert _classify_action("überwachen") == "monitor"
|
||||
|
||||
def test_verify_vs_validate(self):
|
||||
"""verify and validate are separate types."""
|
||||
assert _classify_action("verifizieren") == "verify"
|
||||
assert _classify_action("validieren") == "validate"
|
||||
|
||||
def test_define_vs_document(self):
|
||||
"""define and document are separate types."""
|
||||
assert _classify_action("definieren") == "define"
|
||||
assert _classify_action("festlegen") == "define"
|
||||
assert _classify_action("dokumentieren") == "document"
|
||||
|
||||
def test_approve_action(self):
|
||||
assert _classify_action("genehmigen") == "approve"
|
||||
assert _classify_action("freigeben") == "approve"
|
||||
assert _classify_action("zulassen") == "approve"
|
||||
|
||||
def test_remediate_action(self):
|
||||
assert _classify_action("beheben") == "remediate"
|
||||
assert _classify_action("korrigieren") == "remediate"
|
||||
assert _classify_action("beseitigen") == "remediate"
|
||||
|
||||
def test_process_object_class(self):
|
||||
assert _classify_object("Geschäftsprozess") == "process"
|
||||
assert _classify_object("Managementprozess") == "process"
|
||||
|
||||
def test_consent_object_class(self):
|
||||
assert _classify_object("Einwilligung") == "consent"
|
||||
assert _classify_object("Consent-Management") == "consent"
|
||||
|
||||
|
||||
class TestComposeDeterministic:
|
||||
"""Tests for _compose_deterministic engine."""
|
||||
|
||||
def test_implement_obligation(self):
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="Betreiber müssen MFA implementieren",
|
||||
action="implementieren",
|
||||
object_="MFA",
|
||||
@@ -509,12 +605,49 @@ class TestTemplateFallback:
|
||||
is_test=False,
|
||||
is_reporting=False,
|
||||
)
|
||||
assert "Implementieren" in ac.title
|
||||
assert ac.title == "MFA umgesetzt"
|
||||
assert ac.severity == "high"
|
||||
assert len(ac.requirements) == 1
|
||||
assert len(ac.test_procedure) == 3
|
||||
assert "technischen Konfiguration" in ac.test_procedure[0]
|
||||
assert "Funktionstest" in ac.test_procedure[1]
|
||||
assert "Konfigurationsnachweis" in ac.evidence[0]
|
||||
|
||||
def test_test_obligation(self):
|
||||
ac = _template_fallback(
|
||||
def test_document_obligation(self):
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="Unternehmen müssen Sicherheitsrichtlinie erstellen",
|
||||
action="erstellen",
|
||||
object_="Sicherheitsrichtlinie",
|
||||
parent_title="Security Policy",
|
||||
parent_severity="medium",
|
||||
parent_category="governance",
|
||||
is_test=False,
|
||||
is_reporting=False,
|
||||
)
|
||||
assert ac.title == "Sicherheitsrichtlinie dokumentiert"
|
||||
assert "dokumentiert und aktuell" in ac.test_procedure[0]
|
||||
assert "Vollständigkeit" in ac.test_procedure[1]
|
||||
|
||||
def test_compound_action_uses_implement_template(self):
|
||||
"""'erstellen und implementieren' should use implement template."""
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="Wartungsrichtlinie erstellen und implementieren",
|
||||
action="erstellen und implementieren",
|
||||
object_="Wartungsrichtlinie",
|
||||
parent_title="Maintenance",
|
||||
parent_severity="high",
|
||||
parent_category="operations",
|
||||
is_test=False,
|
||||
is_reporting=False,
|
||||
)
|
||||
assert ac.title == "Wartungsrichtlinie umgesetzt"
|
||||
assert "umgesetzt" in ac.test_procedure[0]
|
||||
# Must NOT contain "Prüfung der erstellen und implementieren"
|
||||
for tp in ac.test_procedure:
|
||||
assert "erstellen und implementieren" not in tp
|
||||
|
||||
def test_test_obligation_overrides_type(self):
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="MFA muss regelmäßig getestet werden",
|
||||
action="testen",
|
||||
object_="MFA-Wirksamkeit",
|
||||
@@ -524,11 +657,11 @@ class TestTemplateFallback:
|
||||
is_test=True,
|
||||
is_reporting=False,
|
||||
)
|
||||
assert "Test:" in ac.title
|
||||
assert "Testprotokoll" in ac.evidence
|
||||
assert "Testpläne" in ac.test_procedure[0]
|
||||
assert "Testprotokoll" in ac.evidence[0]
|
||||
|
||||
def test_reporting_obligation(self):
|
||||
ac = _template_fallback(
|
||||
def test_reporting_obligation_overrides_type(self):
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="Behörden sind über Vorfälle zu informieren",
|
||||
action="informieren",
|
||||
object_="zuständige Behörden",
|
||||
@@ -538,8 +671,383 @@ class TestTemplateFallback:
|
||||
is_test=False,
|
||||
is_reporting=True,
|
||||
)
|
||||
assert "Meldepflicht:" in ac.title
|
||||
assert "Meldeprozess-Dokumentation" in ac.evidence
|
||||
assert "Meldeprozess" in ac.test_procedure[0]
|
||||
assert "Meldeprozess-Dokumentation" in ac.evidence[0]
|
||||
|
||||
def test_no_action_uses_default(self):
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="Allgemeine Pflicht",
|
||||
action="",
|
||||
object_="Datenschutzkonzept",
|
||||
parent_title="Privacy",
|
||||
parent_severity="medium",
|
||||
parent_category="privacy",
|
||||
is_test=False,
|
||||
is_reporting=False,
|
||||
)
|
||||
assert ac.title == "Datenschutzkonzept umgesetzt"
|
||||
assert len(ac.test_procedure) >= 2
|
||||
|
||||
def test_no_object_uses_parent_title(self):
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="System muss gesichert werden",
|
||||
action="absichern",
|
||||
object_="",
|
||||
parent_title="System Security",
|
||||
parent_severity="high",
|
||||
parent_category="security",
|
||||
is_test=False,
|
||||
is_reporting=False,
|
||||
)
|
||||
assert ac.title == "Absichern umgesetzt"
|
||||
# Object placeholder should use parent_title
|
||||
assert "System Security" in ac.test_procedure[0]
|
||||
|
||||
def test_severity_inherited(self):
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="Kritische Pflicht",
|
||||
action="implementieren",
|
||||
object_="Firewall",
|
||||
parent_title="Net",
|
||||
parent_severity="critical",
|
||||
parent_category="security",
|
||||
is_test=False,
|
||||
is_reporting=False,
|
||||
)
|
||||
assert ac.severity == "critical"
|
||||
|
||||
def test_category_inherited(self):
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="Pflicht",
|
||||
action="dokumentieren",
|
||||
object_="X",
|
||||
parent_title="Y",
|
||||
parent_severity="low",
|
||||
parent_category="privacy",
|
||||
is_test=False,
|
||||
is_reporting=False,
|
||||
)
|
||||
assert ac.category == "privacy"
|
||||
|
||||
def test_empty_category_defaults_to_governance(self):
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="Pflicht",
|
||||
action="dokumentieren",
|
||||
object_="X",
|
||||
parent_title="Y",
|
||||
parent_severity="low",
|
||||
parent_category="",
|
||||
is_test=False,
|
||||
is_reporting=False,
|
||||
)
|
||||
assert ac.category == "governance"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GAP 1: STATEMENT FIELD TESTS
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestStatementField:
|
||||
"""Tests for the statement field in _compose_deterministic."""
|
||||
|
||||
def test_statement_with_condition_and_trigger(self):
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="Bei Vorfall müssen Behörden innerhalb von 72 Stunden informiert werden",
|
||||
action="informieren",
|
||||
object_="zuständige Behörden",
|
||||
parent_title="Incident Reporting",
|
||||
parent_severity="high",
|
||||
parent_category="governance",
|
||||
is_test=False,
|
||||
is_reporting=True,
|
||||
trigger_type="event",
|
||||
condition="bei Sicherheitsvorfall",
|
||||
)
|
||||
assert "bei Sicherheitsvorfall," in ac._statement
|
||||
assert "zuständige Behörden" in ac._statement
|
||||
assert "ist" in ac._statement
|
||||
|
||||
def test_statement_without_condition(self):
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="Richtlinie muss dokumentiert werden",
|
||||
action="dokumentieren",
|
||||
object_="Sicherheitsrichtlinie",
|
||||
parent_title="Policy",
|
||||
parent_severity="medium",
|
||||
parent_category="governance",
|
||||
is_test=False,
|
||||
is_reporting=False,
|
||||
)
|
||||
assert ac._statement.startswith("Sicherheitsrichtlinie ist")
|
||||
assert "dokumentiert" in ac._statement
|
||||
|
||||
def test_statement_without_trigger(self):
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="MFA implementieren",
|
||||
action="implementieren",
|
||||
object_="MFA",
|
||||
parent_title="Auth",
|
||||
parent_severity="high",
|
||||
parent_category="security",
|
||||
is_test=False,
|
||||
is_reporting=False,
|
||||
trigger_type="continuous",
|
||||
)
|
||||
assert "MFA ist umgesetzt" == ac._statement
|
||||
|
||||
def test_statement_empty_object_uses_parent(self):
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="Absichern",
|
||||
action="absichern",
|
||||
object_="",
|
||||
parent_title="System Security",
|
||||
parent_severity="high",
|
||||
parent_category="security",
|
||||
is_test=False,
|
||||
is_reporting=False,
|
||||
)
|
||||
assert "System Security" in ac._statement
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GAP 2: PATTERN CANDIDATES TESTS
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestPatternCandidates:
|
||||
"""Tests for pattern_candidates in _compose_deterministic."""
|
||||
|
||||
def test_specific_combo_returns_candidates(self):
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="Verschlüsselung implementieren",
|
||||
action="implementieren",
|
||||
object_="Verschlüsselung",
|
||||
parent_title="Crypto",
|
||||
parent_severity="high",
|
||||
parent_category="security",
|
||||
is_test=False,
|
||||
is_reporting=False,
|
||||
)
|
||||
# implement + technical_control → specific combo
|
||||
assert "technical_safeguard_enabled" in ac._pattern_candidates
|
||||
|
||||
def test_fallback_by_action(self):
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="XYZ bewerten",
|
||||
action="bewerten",
|
||||
object_="Spezialthema",
|
||||
parent_title="X",
|
||||
parent_severity="medium",
|
||||
parent_category="governance",
|
||||
is_test=False,
|
||||
is_reporting=False,
|
||||
)
|
||||
# assess + general → no specific combo, uses action fallback
|
||||
assert "assessment_completed" in ac._pattern_candidates
|
||||
|
||||
def test_unknown_combo_returns_action_fallback(self):
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="Pflicht",
|
||||
action="",
|
||||
object_="",
|
||||
parent_title="Y",
|
||||
parent_severity="low",
|
||||
parent_category="governance",
|
||||
is_test=False,
|
||||
is_reporting=False,
|
||||
)
|
||||
# default action → no pattern candidates
|
||||
assert ac._pattern_candidates == []
|
||||
|
||||
def test_encrypt_data_gets_encryption_patterns(self):
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="Daten verschlüsseln",
|
||||
action="verschlüsseln",
|
||||
object_="personenbezogene Daten",
|
||||
parent_title="Crypto",
|
||||
parent_severity="high",
|
||||
parent_category="security",
|
||||
is_test=False,
|
||||
is_reporting=False,
|
||||
)
|
||||
assert "encryption_at_rest" in ac._pattern_candidates
|
||||
assert "encryption_in_transit" in ac._pattern_candidates
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GAP 3: STRUCTURED TIMING TESTS
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestStructuredTiming:
|
||||
"""Tests for _extract_structured_timing and fields on atomic controls."""
|
||||
|
||||
def test_72_stunden_deadline(self):
|
||||
hours, freq = _extract_structured_timing("innerhalb von 72 Stunden melden")
|
||||
assert hours == 72
|
||||
assert freq is None
|
||||
|
||||
def test_unverzueglich_deadline(self):
|
||||
hours, freq = _extract_structured_timing("unverzüglich melden")
|
||||
assert hours == 0
|
||||
assert freq is None
|
||||
|
||||
def test_yearly_frequency(self):
|
||||
hours, freq = _extract_structured_timing("jährliche Überprüfung")
|
||||
assert hours is None
|
||||
assert freq == "yearly"
|
||||
|
||||
def test_monthly_frequency(self):
|
||||
hours, freq = _extract_structured_timing("monatliche Kontrolle")
|
||||
assert hours is None
|
||||
assert freq == "monthly"
|
||||
|
||||
def test_quarterly_frequency(self):
|
||||
hours, freq = _extract_structured_timing("quartalsweise Berichterstattung")
|
||||
assert hours is None
|
||||
assert freq == "quarterly"
|
||||
|
||||
def test_before_deployment(self):
|
||||
hours, freq = _extract_structured_timing("vor Inbetriebnahme prüfen")
|
||||
assert hours is None
|
||||
assert freq == "before_deployment"
|
||||
|
||||
def test_no_timing_returns_none(self):
|
||||
hours, freq = _extract_structured_timing("MFA implementieren")
|
||||
assert hours is None
|
||||
assert freq is None
|
||||
|
||||
def test_timing_stored_on_atomic(self):
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="Jährliche Überprüfung der Sicherheitsrichtlinie",
|
||||
action="überprüfen",
|
||||
object_="Sicherheitsrichtlinie",
|
||||
parent_title="Review",
|
||||
parent_severity="medium",
|
||||
parent_category="governance",
|
||||
is_test=False,
|
||||
is_reporting=False,
|
||||
trigger_type="periodic",
|
||||
)
|
||||
assert ac._frequency == "yearly"
|
||||
assert ac._deadline_hours is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GAP 4: OBJECT NORMALIZATION (SYNONYMS) TESTS
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestObjectNormalization:
|
||||
"""Tests for synonym-enhanced _normalize_object."""
|
||||
|
||||
def test_richtlinie_to_policy(self):
|
||||
result = _normalize_object("Sicherheitsrichtlinie")
|
||||
assert "policy" in result
|
||||
|
||||
def test_verzeichnis_to_register(self):
|
||||
result = _normalize_object("Verzeichnis der Verarbeitungstätigkeiten")
|
||||
assert "register" in result
|
||||
|
||||
def test_vorfall_to_incident(self):
|
||||
result = _normalize_object("Sicherheitsvorfall")
|
||||
assert "incident" in result
|
||||
|
||||
def test_einwilligung_to_consent(self):
|
||||
result = _normalize_object("Einwilligung der Betroffenen")
|
||||
assert "consent" in result
|
||||
|
||||
def test_no_synonym_preserves_text(self):
|
||||
result = _normalize_object("MFA")
|
||||
assert result == "mfa"
|
||||
|
||||
def test_empty_returns_unknown(self):
|
||||
assert _normalize_object("") == "unknown"
|
||||
|
||||
def test_umlaut_normalization(self):
|
||||
result = _normalize_object("Prüfbericht")
|
||||
assert "ue" in result
|
||||
assert "ä" not in result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GAP 5: OUTPUT VALIDATOR TESTS
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestOutputValidator:
|
||||
"""Tests for _validate_atomic_control."""
|
||||
|
||||
def test_clean_control_passes(self):
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="MFA implementieren",
|
||||
action="implementieren",
|
||||
object_="MFA",
|
||||
parent_title="Auth",
|
||||
parent_severity="high",
|
||||
parent_category="security",
|
||||
is_test=False,
|
||||
is_reporting=False,
|
||||
)
|
||||
errors = [i for i in ac._validation_issues if i.startswith("ERROR:")]
|
||||
assert len(errors) == 0
|
||||
|
||||
def test_empty_title_flagged(self):
|
||||
ac = AtomicControlCandidate(title="", objective="x", test_procedure=["tp"], evidence=["ev"])
|
||||
issues = _validate_atomic_control(ac, "implement", "general")
|
||||
assert any("title is empty" in i for i in issues)
|
||||
|
||||
def test_empty_objective_flagged(self):
|
||||
ac = AtomicControlCandidate(title="OK", objective="", test_procedure=["tp"], evidence=["ev"])
|
||||
issues = _validate_atomic_control(ac, "implement", "general")
|
||||
assert any("objective is empty" in i for i in issues)
|
||||
|
||||
def test_empty_test_procedure_flagged(self):
|
||||
ac = AtomicControlCandidate(title="OK", objective="x", test_procedure=[], evidence=["ev"])
|
||||
issues = _validate_atomic_control(ac, "implement", "general")
|
||||
assert any("test_procedure is empty" in i for i in issues)
|
||||
|
||||
def test_empty_evidence_flagged(self):
|
||||
ac = AtomicControlCandidate(title="OK", objective="x", test_procedure=["tp"], evidence=[])
|
||||
issues = _validate_atomic_control(ac, "implement", "general")
|
||||
assert any("evidence is empty" in i for i in issues)
|
||||
|
||||
def test_general_class_warns(self):
|
||||
ac = AtomicControlCandidate(title="OK", objective="x", test_procedure=["tp"], evidence=["ev"])
|
||||
issues = _validate_atomic_control(ac, "implement", "general")
|
||||
assert any("general" in i for i in issues)
|
||||
|
||||
def test_low_confidence_warns(self):
|
||||
ac = AtomicControlCandidate(title="OK", objective="x", test_procedure=["tp"], evidence=["ev"])
|
||||
ac._decomposition_confidence = 0.3
|
||||
issues = _validate_atomic_control(ac, "default", "general")
|
||||
assert any("low confidence" in i for i in issues)
|
||||
|
||||
def test_empty_evidence_item_flagged(self):
|
||||
ac = AtomicControlCandidate(title="OK", objective="x", test_procedure=["tp"], evidence=["", "ok"])
|
||||
issues = _validate_atomic_control(ac, "implement", "policy")
|
||||
assert any("evidence[0] is empty" in i for i in issues)
|
||||
|
||||
def test_garbage_infinitive_detected(self):
|
||||
"""'Prüfung der implementieren' pattern must be flagged."""
|
||||
ac = AtomicControlCandidate(
|
||||
title="OK", objective="x",
|
||||
test_procedure=["Prüfung der implementieren und dokumentieren"],
|
||||
evidence=["ev"],
|
||||
)
|
||||
issues = _validate_atomic_control(ac, "implement", "policy")
|
||||
assert any("raw infinitive" in i for i in issues)
|
||||
|
||||
def test_valid_infinitive_not_flagged(self):
|
||||
"""'Funktionstest: Wirksamkeit verifizieren' is valid German."""
|
||||
ac = AtomicControlCandidate(
|
||||
title="OK", objective="x",
|
||||
test_procedure=["Funktionstest: Wirksamkeit verifizieren"],
|
||||
evidence=["ev"],
|
||||
)
|
||||
issues = _validate_atomic_control(ac, "implement", "policy")
|
||||
assert not any("raw infinitive" in i for i in issues)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -757,6 +1265,7 @@ class TestDecompositionPassRun0b:
|
||||
"oc-uuid-1", "OC-CTRL-001-01", "parent-uuid-1",
|
||||
"Betreiber müssen Kontinuität sicherstellen",
|
||||
"sicherstellen", "Dienstleistungskontinuität",
|
||||
"", # condition
|
||||
False, False, # is_test, is_reporting
|
||||
"Service Continuity", "finance",
|
||||
'{"source": "MiCA", "article": "Art. 8"}',
|
||||
@@ -802,7 +1311,8 @@ class TestDecompositionPassRun0b:
|
||||
assert stats["llm_failures"] == 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pass0b_template_fallback(self):
|
||||
async def test_pass0b_deterministic_engine(self):
|
||||
"""Deterministic mode (use_anthropic=False) uses engine, no LLM."""
|
||||
mock_db = MagicMock()
|
||||
|
||||
mock_rows = MagicMock()
|
||||
@@ -811,6 +1321,7 @@ class TestDecompositionPassRun0b:
|
||||
"oc-uuid-1", "OC-CTRL-001-01", "parent-uuid-1",
|
||||
"Betreiber müssen MFA implementieren",
|
||||
"implementieren", "MFA",
|
||||
"", # condition
|
||||
False, False,
|
||||
"Auth Controls", "authentication",
|
||||
"", "high", "AUTH-001",
|
||||
@@ -821,6 +1332,9 @@ class TestDecompositionPassRun0b:
|
||||
mock_seq = MagicMock()
|
||||
mock_seq.fetchone.return_value = (0,)
|
||||
|
||||
mock_insert = MagicMock()
|
||||
mock_insert.fetchone.return_value = ("new-uuid-1",)
|
||||
|
||||
call_count = [0]
|
||||
def side_effect(*args, **kwargs):
|
||||
call_count[0] += 1
|
||||
@@ -828,18 +1342,19 @@ class TestDecompositionPassRun0b:
|
||||
return mock_rows
|
||||
if call_count[0] == 2:
|
||||
return mock_seq
|
||||
if call_count[0] == 3:
|
||||
return mock_insert # INSERT RETURNING
|
||||
return MagicMock()
|
||||
|
||||
mock_db.execute.side_effect = side_effect
|
||||
|
||||
with patch("compliance.services.obligation_extractor._llm_ollama", new_callable=AsyncMock) as mock_llm:
|
||||
mock_llm.return_value = "Sorry, invalid response" # LLM fails
|
||||
# No LLM mock needed — deterministic engine
|
||||
decomp = DecompositionPass(db=mock_db)
|
||||
stats = await decomp.run_pass0b(limit=10)
|
||||
|
||||
decomp = DecompositionPass(db=mock_db)
|
||||
stats = await decomp.run_pass0b(limit=10)
|
||||
|
||||
assert stats["controls_created"] == 1
|
||||
assert stats["llm_failures"] == 1
|
||||
assert stats["controls_created"] == 1
|
||||
assert stats["provider"] == "deterministic"
|
||||
assert stats["llm_calls"] == 0
|
||||
|
||||
|
||||
class TestDecompositionStatus:
|
||||
@@ -1098,12 +1613,14 @@ class TestDecompositionPassAnthropicBatch:
|
||||
mock_rows.fetchall.return_value = [
|
||||
("oc-uuid-1", "OC-CTRL-001-01", "parent-uuid-1",
|
||||
"MFA implementieren", "implementieren", "MFA",
|
||||
"", # condition
|
||||
False, False, "Auth", "security",
|
||||
'{"source": "DSGVO", "article": "Art. 32"}',
|
||||
"high", "CTRL-001",
|
||||
"continuous", False),
|
||||
("oc-uuid-2", "OC-CTRL-001-02", "parent-uuid-1",
|
||||
"MFA testen", "testen", "MFA",
|
||||
"", # condition
|
||||
True, False, "Auth", "security",
|
||||
'{"source": "DSGVO", "article": "Art. 32"}',
|
||||
"high", "CTRL-001",
|
||||
|
||||
Reference in New Issue
Block a user