bcf1bfa038
CI / detect-changes (push) Successful in 7s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Failing after 4s
CI / validate-canonical-controls (push) Successful in 11s
CI / loc-budget (push) Failing after 15s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 29s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
Adds tests/test_template_rule_routes.py with: - Schema tests (Pydantic validation: condition, clause, version create, submit-for-review change_summary, override create, recommendation request) - Clause evaluator (eq, neq, in, not_in, gte with string buckets, exists, truthy) - Condition evaluator (all/any kinds, empty clauses always pass) - Recommendation profile tests (table-driven): * AI-Startup with 2 employees gets ai_usage_policy but not whistleblower * 1000+ employee corporate gets whistleblower * Always-rules (impressum) apply to anyone * Third-country transfer triggers TIA unless DPF/adequate - Tenant override tests: * Override changes classification (required → optional with override_applied flag) * NULL override disables rule completely Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
382 lines
13 KiB
Python
382 lines
13 KiB
Python
"""
|
|
Tests fuer Template Rule Routes — Migration 147 (compliance_template_rules).
|
|
|
|
Deckt ab:
|
|
- Schema-Validation (Pydantic)
|
|
- Recommendation-Service (Condition-Evaluator + Override-Anwendung)
|
|
- Rule-Lifecycle (submit → approve → publish, reject)
|
|
- Pflichtfelder (source_citation, change_summary)
|
|
"""
|
|
|
|
from datetime import datetime, timezone
|
|
from unittest.mock import MagicMock
|
|
from uuid import uuid4
|
|
|
|
import pytest
|
|
|
|
from compliance.schemas.template_rule import (
|
|
OverrideCreate,
|
|
RecommendationRequest,
|
|
RuleCondition,
|
|
RuleClause,
|
|
RuleVersionCreate,
|
|
SubmitForReviewRequest,
|
|
)
|
|
from compliance.services.recommendation_service import (
|
|
RecommendationService,
|
|
_eval_clause,
|
|
_eval_condition,
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# Fixtures
|
|
# ============================================================================
|
|
|
|
def make_rule(
|
|
rule_id=None,
|
|
rule_key="test_rule",
|
|
document_type="privacy_policy",
|
|
title="Test Regel",
|
|
):
|
|
r = MagicMock()
|
|
r.id = rule_id or uuid4()
|
|
r.rule_key = rule_key
|
|
r.document_type = document_type
|
|
r.title = title
|
|
r.current_version_id = None
|
|
r.created_at = datetime.now(timezone.utc)
|
|
r.updated_at = None
|
|
return r
|
|
|
|
|
|
def make_version(
|
|
rule_id=None,
|
|
classification="required",
|
|
conditions=None,
|
|
is_live=1,
|
|
status="published",
|
|
source_citation="Test Quelle",
|
|
):
|
|
v = MagicMock()
|
|
v.id = uuid4()
|
|
v.rule_id = rule_id or uuid4()
|
|
v.version_number = 1
|
|
v.status = status
|
|
v.is_live = is_live
|
|
v.classification = classification
|
|
v.conditions = conditions or {"kind": "all", "clauses": []}
|
|
v.source_citation = source_citation
|
|
v.rationale = None
|
|
v.change_summary = None
|
|
v.created_by = None
|
|
v.submitted_by = None
|
|
v.submitted_at = None
|
|
v.approved_by = None
|
|
v.approved_at = None
|
|
v.published_by = None
|
|
v.published_at = None
|
|
v.rejected_by = None
|
|
v.rejected_at = None
|
|
v.rejection_reason = None
|
|
v.created_at = datetime.now(timezone.utc)
|
|
v.updated_at = None
|
|
return v
|
|
|
|
|
|
def make_override(rule_id, override_classification="optional", reason="Tenant-Anpassung"):
|
|
o = MagicMock()
|
|
o.id = uuid4()
|
|
o.tenant_id = "test-tenant"
|
|
o.rule_id = rule_id
|
|
o.override_classification = override_classification
|
|
o.reason = reason
|
|
o.created_by = "anwalt@test"
|
|
o.created_at = datetime.now(timezone.utc)
|
|
o.updated_at = None
|
|
return o
|
|
|
|
|
|
# ============================================================================
|
|
# Schema-Tests
|
|
# ============================================================================
|
|
|
|
class TestSchemas:
|
|
def test_rule_condition_default_kind_all(self):
|
|
c = RuleCondition()
|
|
assert c.kind == "all"
|
|
assert c.clauses == []
|
|
|
|
def test_rule_clause_minimal(self):
|
|
c = RuleClause(field="employees", op="gte", value=50)
|
|
assert c.field == "employees"
|
|
assert c.op == "gte"
|
|
assert c.value == 50
|
|
|
|
def test_version_create_requires_classification(self):
|
|
with pytest.raises(Exception):
|
|
RuleVersionCreate( # type: ignore[call-arg]
|
|
rule_id=str(uuid4()),
|
|
conditions=RuleCondition(),
|
|
source_citation="x",
|
|
)
|
|
|
|
def test_submit_for_review_includes_change_summary(self):
|
|
req = SubmitForReviewRequest(change_summary="Korrekturen vorgenommen")
|
|
assert req.change_summary == "Korrekturen vorgenommen"
|
|
|
|
def test_override_create(self):
|
|
oc = OverrideCreate(
|
|
rule_id=str(uuid4()),
|
|
override_classification="optional",
|
|
reason="Bei meinen Mandanten nicht relevant",
|
|
)
|
|
assert oc.override_classification == "optional"
|
|
|
|
def test_recommendation_request_minimal(self):
|
|
req = RecommendationRequest(profile={"employees": 100})
|
|
assert req.profile == {"employees": 100}
|
|
|
|
|
|
# ============================================================================
|
|
# Clause-Evaluator
|
|
# ============================================================================
|
|
|
|
class TestClauseEvaluation:
|
|
def test_eq_match(self):
|
|
assert _eval_clause({"field": "x", "op": "eq", "value": "yes"}, {"x": "yes"})
|
|
|
|
def test_eq_mismatch(self):
|
|
assert not _eval_clause({"field": "x", "op": "eq", "value": "yes"}, {"x": "no"})
|
|
|
|
def test_neq(self):
|
|
assert _eval_clause({"field": "x", "op": "neq", "value": "no"}, {"x": "yes"})
|
|
|
|
def test_in_match(self):
|
|
clause = {"field": "x", "op": "in", "value": ["a", "b"]}
|
|
assert _eval_clause(clause, {"x": "a"})
|
|
assert not _eval_clause(clause, {"x": "c"})
|
|
|
|
def test_not_in_match(self):
|
|
clause = {"field": "x", "op": "not_in", "value": ["a", "b"]}
|
|
assert _eval_clause(clause, {"x": "c"})
|
|
assert not _eval_clause(clause, {"x": "a"})
|
|
|
|
def test_gte_with_string_buckets(self):
|
|
# "50_249" → 50 (untere Schwelle)
|
|
assert _eval_clause(
|
|
{"field": "employees", "op": "gte", "value": 50}, {"employees": "50_249"}
|
|
)
|
|
assert not _eval_clause(
|
|
{"field": "employees", "op": "gte", "value": 100}, {"employees": "50_249"}
|
|
)
|
|
|
|
def test_exists(self):
|
|
clause = {"field": "x", "op": "exists"}
|
|
assert _eval_clause(clause, {"x": "value"})
|
|
assert not _eval_clause(clause, {"x": ""})
|
|
assert not _eval_clause(clause, {})
|
|
|
|
def test_truthy(self):
|
|
clause = {"field": "x", "op": "truthy"}
|
|
assert _eval_clause(clause, {"x": True})
|
|
assert _eval_clause(clause, {"x": "yes"})
|
|
assert not _eval_clause(clause, {"x": False})
|
|
assert not _eval_clause(clause, {"x": None})
|
|
|
|
|
|
class TestConditionEvaluation:
|
|
def test_all_kind_requires_every_clause(self):
|
|
cond = {"kind": "all", "clauses": [
|
|
{"field": "a", "op": "eq", "value": 1},
|
|
{"field": "b", "op": "eq", "value": 2},
|
|
]}
|
|
assert _eval_condition(cond, {"a": 1, "b": 2})
|
|
assert not _eval_condition(cond, {"a": 1, "b": 3})
|
|
|
|
def test_any_kind_passes_with_one(self):
|
|
cond = {"kind": "any", "clauses": [
|
|
{"field": "a", "op": "eq", "value": 1},
|
|
{"field": "b", "op": "eq", "value": 2},
|
|
]}
|
|
assert _eval_condition(cond, {"a": 1, "b": 99})
|
|
assert _eval_condition(cond, {"a": 99, "b": 2})
|
|
assert not _eval_condition(cond, {"a": 99, "b": 99})
|
|
|
|
def test_empty_clauses_always_true(self):
|
|
cond = {"kind": "all", "clauses": []}
|
|
assert _eval_condition(cond, {})
|
|
|
|
|
|
# ============================================================================
|
|
# Recommendation-Service (Tabellengetrieben)
|
|
# ============================================================================
|
|
|
|
@pytest.fixture
|
|
def mock_db():
|
|
return MagicMock()
|
|
|
|
|
|
def _setup_db_with_rules(mock_db, rules_and_versions, overrides=None):
|
|
"""Setze die DB-Queries so, dass list_versions / list_rules / list_overrides
|
|
die übergebenen Listen liefern."""
|
|
rules = [r for r, _ in rules_and_versions]
|
|
versions = [v for _, v in rules_and_versions]
|
|
overrides = overrides or []
|
|
|
|
def query_side_effect(model):
|
|
from compliance.db.template_rule_models import (
|
|
TemplateRuleDB, TemplateRuleVersionDB, TenantRuleOverrideDB,
|
|
)
|
|
q = MagicMock()
|
|
q.filter.return_value = q
|
|
if model == TemplateRuleVersionDB:
|
|
q.all.return_value = versions
|
|
elif model == TemplateRuleDB:
|
|
q.all.return_value = rules
|
|
elif model == TenantRuleOverrideDB:
|
|
q.all.return_value = overrides
|
|
return q
|
|
|
|
mock_db.query.side_effect = query_side_effect
|
|
|
|
|
|
class TestRecommendationProfiles:
|
|
"""Tabellengetriebene Tests: Profil → erwartete required-Set."""
|
|
|
|
def test_ai_startup_2mw_gets_ai_policy_but_not_whistleblower(self, mock_db):
|
|
ai_rule = make_rule(rule_key="ai_required", document_type="ai_usage_policy")
|
|
ai_version = make_version(
|
|
rule_id=ai_rule.id,
|
|
classification="required",
|
|
conditions={"kind": "any", "clauses": [
|
|
{"field": "proc_ai_usage", "op": "not_in", "value": ["none", "no"]},
|
|
]},
|
|
)
|
|
whistle_rule = make_rule(
|
|
rule_key="whistleblower_required", document_type="whistleblower_policy",
|
|
)
|
|
whistle_version = make_version(
|
|
rule_id=whistle_rule.id,
|
|
classification="required",
|
|
conditions={"kind": "all", "clauses": [
|
|
{"field": "org_employee_count", "op": "in",
|
|
"value": ["50_249", "250_999", "1000_plus"]},
|
|
]},
|
|
)
|
|
_setup_db_with_rules(mock_db, [
|
|
(ai_rule, ai_version),
|
|
(whistle_rule, whistle_version),
|
|
])
|
|
|
|
svc = RecommendationService(mock_db)
|
|
result = svc.recommend(RecommendationRequest(profile={
|
|
"proc_ai_usage": "extensive",
|
|
"org_employee_count": "1_9",
|
|
}))
|
|
types = [item.document_type for item in result.required]
|
|
assert "ai_usage_policy" in types
|
|
assert "whistleblower_policy" not in types
|
|
|
|
def test_corporate_5000_gets_whistleblower(self, mock_db):
|
|
whistle_rule = make_rule(
|
|
rule_key="whistleblower_required", document_type="whistleblower_policy",
|
|
)
|
|
whistle_version = make_version(
|
|
rule_id=whistle_rule.id,
|
|
classification="required",
|
|
conditions={"kind": "all", "clauses": [
|
|
{"field": "org_employee_count", "op": "in",
|
|
"value": ["50_249", "250_999", "1000_plus"]},
|
|
]},
|
|
)
|
|
_setup_db_with_rules(mock_db, [(whistle_rule, whistle_version)])
|
|
svc = RecommendationService(mock_db)
|
|
result = svc.recommend(RecommendationRequest(profile={
|
|
"org_employee_count": "1000_plus",
|
|
}))
|
|
assert any(i.document_type == "whistleblower_policy" for i in result.required)
|
|
|
|
def test_always_rules_apply_to_anyone(self, mock_db):
|
|
impressum = make_rule(rule_key="impressum_always", document_type="impressum")
|
|
impressum_v = make_version(
|
|
rule_id=impressum.id,
|
|
classification="required",
|
|
conditions={"kind": "all", "clauses": []}, # always
|
|
)
|
|
_setup_db_with_rules(mock_db, [(impressum, impressum_v)])
|
|
svc = RecommendationService(mock_db)
|
|
result = svc.recommend(RecommendationRequest(profile={}))
|
|
assert any(i.document_type == "impressum" for i in result.required)
|
|
|
|
def test_third_country_triggers_tia(self, mock_db):
|
|
tia = make_rule(rule_key="tia_required", document_type="transfer_impact_assessment")
|
|
tia_v = make_version(
|
|
rule_id=tia.id,
|
|
classification="required",
|
|
conditions={"kind": "all", "clauses": [
|
|
{"field": "tech_third_country", "op": "not_in",
|
|
"value": ["no", "us_dpf_only", "adequate_only"]},
|
|
]},
|
|
)
|
|
_setup_db_with_rules(mock_db, [(tia, tia_v)])
|
|
svc = RecommendationService(mock_db)
|
|
# Drittland-Transfer ohne DPF/Adäquanz → TIA required
|
|
result = svc.recommend(RecommendationRequest(profile={
|
|
"tech_third_country": "yes_other",
|
|
}))
|
|
assert any(i.document_type == "transfer_impact_assessment" for i in result.required)
|
|
# Mit DPF → keine TIA-Pflicht
|
|
result = svc.recommend(RecommendationRequest(profile={
|
|
"tech_third_country": "us_dpf_only",
|
|
}))
|
|
assert not any(i.document_type == "transfer_impact_assessment" for i in result.required)
|
|
|
|
|
|
class TestTenantOverrides:
|
|
"""Override-Anwendung im Recommendation-Service."""
|
|
|
|
def test_override_changes_classification(self, mock_db):
|
|
rule = make_rule(rule_key="r1", document_type="some_doc")
|
|
version = make_version(
|
|
rule_id=rule.id, classification="required",
|
|
conditions={"kind": "all", "clauses": []},
|
|
)
|
|
override = make_override(rule.id, override_classification="optional")
|
|
_setup_db_with_rules(mock_db, [(rule, version)], overrides=[override])
|
|
|
|
svc = RecommendationService(mock_db)
|
|
result = svc.recommend(
|
|
RecommendationRequest(profile={}),
|
|
tenant_id="test-tenant",
|
|
)
|
|
# Erscheint im optional-Bucket statt required
|
|
assert not any(i.document_type == "some_doc" for i in result.required)
|
|
assert any(i.document_type == "some_doc" for i in result.optional)
|
|
# override_applied Flag gesetzt
|
|
opt_item = [i for i in result.optional if i.document_type == "some_doc"][0]
|
|
assert opt_item.override_applied
|
|
assert opt_item.base_classification == "required"
|
|
|
|
def test_null_override_disables_rule(self, mock_db):
|
|
rule = make_rule(rule_key="r1", document_type="some_doc")
|
|
version = make_version(
|
|
rule_id=rule.id, classification="required",
|
|
conditions={"kind": "all", "clauses": []},
|
|
)
|
|
override = make_override(rule.id, override_classification=None)
|
|
_setup_db_with_rules(mock_db, [(rule, version)], overrides=[override])
|
|
|
|
svc = RecommendationService(mock_db)
|
|
result = svc.recommend(
|
|
RecommendationRequest(profile={}),
|
|
tenant_id="test-tenant",
|
|
)
|
|
# Regel komplett verschwunden
|
|
all_types = [
|
|
i.document_type for bucket in
|
|
(result.required, result.recommended, result.optional)
|
|
for i in bucket
|
|
]
|
|
assert "some_doc" not in all_types
|