test(template-rules): pytest suite for backend foundation (Phase 1.6)
CI / detect-changes (push) Successful in 7s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Failing after 4s
CI / validate-canonical-controls (push) Successful in 11s
CI / loc-budget (push) Failing after 15s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 29s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
CI / detect-changes (push) Successful in 7s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Failing after 4s
CI / validate-canonical-controls (push) Successful in 11s
CI / loc-budget (push) Failing after 15s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 29s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
Adds tests/test_template_rule_routes.py with: - Schema tests (Pydantic validation: condition, clause, version create, submit-for-review change_summary, override create, recommendation request) - Clause evaluator (eq, neq, in, not_in, gte with string buckets, exists, truthy) - Condition evaluator (all/any kinds, empty clauses always pass) - Recommendation profile tests (table-driven): * AI-Startup with 2 employees gets ai_usage_policy but not whistleblower * 1000+ employee corporate gets whistleblower * Always-rules (impressum) apply to anyone * Third-country transfer triggers TIA unless DPF/adequate - Tenant override tests: * Override changes classification (required → optional with override_applied flag) * NULL override disables rule completely Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,381 @@
|
||||
"""
|
||||
Tests fuer Template Rule Routes — Migration 147 (compliance_template_rules).
|
||||
|
||||
Deckt ab:
|
||||
- Schema-Validation (Pydantic)
|
||||
- Recommendation-Service (Condition-Evaluator + Override-Anwendung)
|
||||
- Rule-Lifecycle (submit → approve → publish, reject)
|
||||
- Pflichtfelder (source_citation, change_summary)
|
||||
"""
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from unittest.mock import MagicMock
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
|
||||
from compliance.schemas.template_rule import (
|
||||
OverrideCreate,
|
||||
RecommendationRequest,
|
||||
RuleCondition,
|
||||
RuleClause,
|
||||
RuleVersionCreate,
|
||||
SubmitForReviewRequest,
|
||||
)
|
||||
from compliance.services.recommendation_service import (
|
||||
RecommendationService,
|
||||
_eval_clause,
|
||||
_eval_condition,
|
||||
)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Fixtures
|
||||
# ============================================================================
|
||||
|
||||
def make_rule(
|
||||
rule_id=None,
|
||||
rule_key="test_rule",
|
||||
document_type="privacy_policy",
|
||||
title="Test Regel",
|
||||
):
|
||||
r = MagicMock()
|
||||
r.id = rule_id or uuid4()
|
||||
r.rule_key = rule_key
|
||||
r.document_type = document_type
|
||||
r.title = title
|
||||
r.current_version_id = None
|
||||
r.created_at = datetime.now(timezone.utc)
|
||||
r.updated_at = None
|
||||
return r
|
||||
|
||||
|
||||
def make_version(
|
||||
rule_id=None,
|
||||
classification="required",
|
||||
conditions=None,
|
||||
is_live=1,
|
||||
status="published",
|
||||
source_citation="Test Quelle",
|
||||
):
|
||||
v = MagicMock()
|
||||
v.id = uuid4()
|
||||
v.rule_id = rule_id or uuid4()
|
||||
v.version_number = 1
|
||||
v.status = status
|
||||
v.is_live = is_live
|
||||
v.classification = classification
|
||||
v.conditions = conditions or {"kind": "all", "clauses": []}
|
||||
v.source_citation = source_citation
|
||||
v.rationale = None
|
||||
v.change_summary = None
|
||||
v.created_by = None
|
||||
v.submitted_by = None
|
||||
v.submitted_at = None
|
||||
v.approved_by = None
|
||||
v.approved_at = None
|
||||
v.published_by = None
|
||||
v.published_at = None
|
||||
v.rejected_by = None
|
||||
v.rejected_at = None
|
||||
v.rejection_reason = None
|
||||
v.created_at = datetime.now(timezone.utc)
|
||||
v.updated_at = None
|
||||
return v
|
||||
|
||||
|
||||
def make_override(rule_id, override_classification="optional", reason="Tenant-Anpassung"):
|
||||
o = MagicMock()
|
||||
o.id = uuid4()
|
||||
o.tenant_id = "test-tenant"
|
||||
o.rule_id = rule_id
|
||||
o.override_classification = override_classification
|
||||
o.reason = reason
|
||||
o.created_by = "anwalt@test"
|
||||
o.created_at = datetime.now(timezone.utc)
|
||||
o.updated_at = None
|
||||
return o
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Schema-Tests
|
||||
# ============================================================================
|
||||
|
||||
class TestSchemas:
|
||||
def test_rule_condition_default_kind_all(self):
|
||||
c = RuleCondition()
|
||||
assert c.kind == "all"
|
||||
assert c.clauses == []
|
||||
|
||||
def test_rule_clause_minimal(self):
|
||||
c = RuleClause(field="employees", op="gte", value=50)
|
||||
assert c.field == "employees"
|
||||
assert c.op == "gte"
|
||||
assert c.value == 50
|
||||
|
||||
def test_version_create_requires_classification(self):
|
||||
with pytest.raises(Exception):
|
||||
RuleVersionCreate( # type: ignore[call-arg]
|
||||
rule_id=str(uuid4()),
|
||||
conditions=RuleCondition(),
|
||||
source_citation="x",
|
||||
)
|
||||
|
||||
def test_submit_for_review_includes_change_summary(self):
|
||||
req = SubmitForReviewRequest(change_summary="Korrekturen vorgenommen")
|
||||
assert req.change_summary == "Korrekturen vorgenommen"
|
||||
|
||||
def test_override_create(self):
|
||||
oc = OverrideCreate(
|
||||
rule_id=str(uuid4()),
|
||||
override_classification="optional",
|
||||
reason="Bei meinen Mandanten nicht relevant",
|
||||
)
|
||||
assert oc.override_classification == "optional"
|
||||
|
||||
def test_recommendation_request_minimal(self):
|
||||
req = RecommendationRequest(profile={"employees": 100})
|
||||
assert req.profile == {"employees": 100}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Clause-Evaluator
|
||||
# ============================================================================
|
||||
|
||||
class TestClauseEvaluation:
|
||||
def test_eq_match(self):
|
||||
assert _eval_clause({"field": "x", "op": "eq", "value": "yes"}, {"x": "yes"})
|
||||
|
||||
def test_eq_mismatch(self):
|
||||
assert not _eval_clause({"field": "x", "op": "eq", "value": "yes"}, {"x": "no"})
|
||||
|
||||
def test_neq(self):
|
||||
assert _eval_clause({"field": "x", "op": "neq", "value": "no"}, {"x": "yes"})
|
||||
|
||||
def test_in_match(self):
|
||||
clause = {"field": "x", "op": "in", "value": ["a", "b"]}
|
||||
assert _eval_clause(clause, {"x": "a"})
|
||||
assert not _eval_clause(clause, {"x": "c"})
|
||||
|
||||
def test_not_in_match(self):
|
||||
clause = {"field": "x", "op": "not_in", "value": ["a", "b"]}
|
||||
assert _eval_clause(clause, {"x": "c"})
|
||||
assert not _eval_clause(clause, {"x": "a"})
|
||||
|
||||
def test_gte_with_string_buckets(self):
|
||||
# "50_249" → 50 (untere Schwelle)
|
||||
assert _eval_clause(
|
||||
{"field": "employees", "op": "gte", "value": 50}, {"employees": "50_249"}
|
||||
)
|
||||
assert not _eval_clause(
|
||||
{"field": "employees", "op": "gte", "value": 100}, {"employees": "50_249"}
|
||||
)
|
||||
|
||||
def test_exists(self):
|
||||
clause = {"field": "x", "op": "exists"}
|
||||
assert _eval_clause(clause, {"x": "value"})
|
||||
assert not _eval_clause(clause, {"x": ""})
|
||||
assert not _eval_clause(clause, {})
|
||||
|
||||
def test_truthy(self):
|
||||
clause = {"field": "x", "op": "truthy"}
|
||||
assert _eval_clause(clause, {"x": True})
|
||||
assert _eval_clause(clause, {"x": "yes"})
|
||||
assert not _eval_clause(clause, {"x": False})
|
||||
assert not _eval_clause(clause, {"x": None})
|
||||
|
||||
|
||||
class TestConditionEvaluation:
|
||||
def test_all_kind_requires_every_clause(self):
|
||||
cond = {"kind": "all", "clauses": [
|
||||
{"field": "a", "op": "eq", "value": 1},
|
||||
{"field": "b", "op": "eq", "value": 2},
|
||||
]}
|
||||
assert _eval_condition(cond, {"a": 1, "b": 2})
|
||||
assert not _eval_condition(cond, {"a": 1, "b": 3})
|
||||
|
||||
def test_any_kind_passes_with_one(self):
|
||||
cond = {"kind": "any", "clauses": [
|
||||
{"field": "a", "op": "eq", "value": 1},
|
||||
{"field": "b", "op": "eq", "value": 2},
|
||||
]}
|
||||
assert _eval_condition(cond, {"a": 1, "b": 99})
|
||||
assert _eval_condition(cond, {"a": 99, "b": 2})
|
||||
assert not _eval_condition(cond, {"a": 99, "b": 99})
|
||||
|
||||
def test_empty_clauses_always_true(self):
|
||||
cond = {"kind": "all", "clauses": []}
|
||||
assert _eval_condition(cond, {})
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Recommendation-Service (Tabellengetrieben)
|
||||
# ============================================================================
|
||||
|
||||
@pytest.fixture
|
||||
def mock_db():
|
||||
return MagicMock()
|
||||
|
||||
|
||||
def _setup_db_with_rules(mock_db, rules_and_versions, overrides=None):
|
||||
"""Setze die DB-Queries so, dass list_versions / list_rules / list_overrides
|
||||
die übergebenen Listen liefern."""
|
||||
rules = [r for r, _ in rules_and_versions]
|
||||
versions = [v for _, v in rules_and_versions]
|
||||
overrides = overrides or []
|
||||
|
||||
def query_side_effect(model):
|
||||
from compliance.db.template_rule_models import (
|
||||
TemplateRuleDB, TemplateRuleVersionDB, TenantRuleOverrideDB,
|
||||
)
|
||||
q = MagicMock()
|
||||
q.filter.return_value = q
|
||||
if model == TemplateRuleVersionDB:
|
||||
q.all.return_value = versions
|
||||
elif model == TemplateRuleDB:
|
||||
q.all.return_value = rules
|
||||
elif model == TenantRuleOverrideDB:
|
||||
q.all.return_value = overrides
|
||||
return q
|
||||
|
||||
mock_db.query.side_effect = query_side_effect
|
||||
|
||||
|
||||
class TestRecommendationProfiles:
|
||||
"""Tabellengetriebene Tests: Profil → erwartete required-Set."""
|
||||
|
||||
def test_ai_startup_2mw_gets_ai_policy_but_not_whistleblower(self, mock_db):
|
||||
ai_rule = make_rule(rule_key="ai_required", document_type="ai_usage_policy")
|
||||
ai_version = make_version(
|
||||
rule_id=ai_rule.id,
|
||||
classification="required",
|
||||
conditions={"kind": "any", "clauses": [
|
||||
{"field": "proc_ai_usage", "op": "not_in", "value": ["none", "no"]},
|
||||
]},
|
||||
)
|
||||
whistle_rule = make_rule(
|
||||
rule_key="whistleblower_required", document_type="whistleblower_policy",
|
||||
)
|
||||
whistle_version = make_version(
|
||||
rule_id=whistle_rule.id,
|
||||
classification="required",
|
||||
conditions={"kind": "all", "clauses": [
|
||||
{"field": "org_employee_count", "op": "in",
|
||||
"value": ["50_249", "250_999", "1000_plus"]},
|
||||
]},
|
||||
)
|
||||
_setup_db_with_rules(mock_db, [
|
||||
(ai_rule, ai_version),
|
||||
(whistle_rule, whistle_version),
|
||||
])
|
||||
|
||||
svc = RecommendationService(mock_db)
|
||||
result = svc.recommend(RecommendationRequest(profile={
|
||||
"proc_ai_usage": "extensive",
|
||||
"org_employee_count": "1_9",
|
||||
}))
|
||||
types = [item.document_type for item in result.required]
|
||||
assert "ai_usage_policy" in types
|
||||
assert "whistleblower_policy" not in types
|
||||
|
||||
def test_corporate_5000_gets_whistleblower(self, mock_db):
|
||||
whistle_rule = make_rule(
|
||||
rule_key="whistleblower_required", document_type="whistleblower_policy",
|
||||
)
|
||||
whistle_version = make_version(
|
||||
rule_id=whistle_rule.id,
|
||||
classification="required",
|
||||
conditions={"kind": "all", "clauses": [
|
||||
{"field": "org_employee_count", "op": "in",
|
||||
"value": ["50_249", "250_999", "1000_plus"]},
|
||||
]},
|
||||
)
|
||||
_setup_db_with_rules(mock_db, [(whistle_rule, whistle_version)])
|
||||
svc = RecommendationService(mock_db)
|
||||
result = svc.recommend(RecommendationRequest(profile={
|
||||
"org_employee_count": "1000_plus",
|
||||
}))
|
||||
assert any(i.document_type == "whistleblower_policy" for i in result.required)
|
||||
|
||||
def test_always_rules_apply_to_anyone(self, mock_db):
|
||||
impressum = make_rule(rule_key="impressum_always", document_type="impressum")
|
||||
impressum_v = make_version(
|
||||
rule_id=impressum.id,
|
||||
classification="required",
|
||||
conditions={"kind": "all", "clauses": []}, # always
|
||||
)
|
||||
_setup_db_with_rules(mock_db, [(impressum, impressum_v)])
|
||||
svc = RecommendationService(mock_db)
|
||||
result = svc.recommend(RecommendationRequest(profile={}))
|
||||
assert any(i.document_type == "impressum" for i in result.required)
|
||||
|
||||
def test_third_country_triggers_tia(self, mock_db):
|
||||
tia = make_rule(rule_key="tia_required", document_type="transfer_impact_assessment")
|
||||
tia_v = make_version(
|
||||
rule_id=tia.id,
|
||||
classification="required",
|
||||
conditions={"kind": "all", "clauses": [
|
||||
{"field": "tech_third_country", "op": "not_in",
|
||||
"value": ["no", "us_dpf_only", "adequate_only"]},
|
||||
]},
|
||||
)
|
||||
_setup_db_with_rules(mock_db, [(tia, tia_v)])
|
||||
svc = RecommendationService(mock_db)
|
||||
# Drittland-Transfer ohne DPF/Adäquanz → TIA required
|
||||
result = svc.recommend(RecommendationRequest(profile={
|
||||
"tech_third_country": "yes_other",
|
||||
}))
|
||||
assert any(i.document_type == "transfer_impact_assessment" for i in result.required)
|
||||
# Mit DPF → keine TIA-Pflicht
|
||||
result = svc.recommend(RecommendationRequest(profile={
|
||||
"tech_third_country": "us_dpf_only",
|
||||
}))
|
||||
assert not any(i.document_type == "transfer_impact_assessment" for i in result.required)
|
||||
|
||||
|
||||
class TestTenantOverrides:
|
||||
"""Override-Anwendung im Recommendation-Service."""
|
||||
|
||||
def test_override_changes_classification(self, mock_db):
|
||||
rule = make_rule(rule_key="r1", document_type="some_doc")
|
||||
version = make_version(
|
||||
rule_id=rule.id, classification="required",
|
||||
conditions={"kind": "all", "clauses": []},
|
||||
)
|
||||
override = make_override(rule.id, override_classification="optional")
|
||||
_setup_db_with_rules(mock_db, [(rule, version)], overrides=[override])
|
||||
|
||||
svc = RecommendationService(mock_db)
|
||||
result = svc.recommend(
|
||||
RecommendationRequest(profile={}),
|
||||
tenant_id="test-tenant",
|
||||
)
|
||||
# Erscheint im optional-Bucket statt required
|
||||
assert not any(i.document_type == "some_doc" for i in result.required)
|
||||
assert any(i.document_type == "some_doc" for i in result.optional)
|
||||
# override_applied Flag gesetzt
|
||||
opt_item = [i for i in result.optional if i.document_type == "some_doc"][0]
|
||||
assert opt_item.override_applied
|
||||
assert opt_item.base_classification == "required"
|
||||
|
||||
def test_null_override_disables_rule(self, mock_db):
|
||||
rule = make_rule(rule_key="r1", document_type="some_doc")
|
||||
version = make_version(
|
||||
rule_id=rule.id, classification="required",
|
||||
conditions={"kind": "all", "clauses": []},
|
||||
)
|
||||
override = make_override(rule.id, override_classification=None)
|
||||
_setup_db_with_rules(mock_db, [(rule, version)], overrides=[override])
|
||||
|
||||
svc = RecommendationService(mock_db)
|
||||
result = svc.recommend(
|
||||
RecommendationRequest(profile={}),
|
||||
tenant_id="test-tenant",
|
||||
)
|
||||
# Regel komplett verschwunden
|
||||
all_types = [
|
||||
i.document_type for bucket in
|
||||
(result.required, result.recommended, result.optional)
|
||||
for i in bucket
|
||||
]
|
||||
assert "some_doc" not in all_types
|
||||
Reference in New Issue
Block a user