feat(pipeline): v3 — scoped control applicability + source_type classification
Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Failing after 36s
CI/CD / test-python-backend-compliance (push) Successful in 36s
CI/CD / test-python-document-crawler (push) Successful in 27s
CI/CD / test-python-dsms-gateway (push) Successful in 18s
CI/CD / validate-canonical-controls (push) Successful in 11s
CI/CD / Deploy (push) Has been skipped

Phase 4: source_type (law/guideline/standard/restricted) on source_citation
- NIST/OWASP/ENISA correctly shown as "Standard" instead of "Gesetzliche Grundlage"
- Dynamic frontend labels based on source_type
- Backfill endpoint POST /v1/canonical/generate/backfill-source-type

Phase v3: Scoped Control Applicability
- 3 new fields: applicable_industries, applicable_company_size, scope_conditions
- LLM prompt extended with 39 industries, 5 company sizes, 10 scope signals
- All 5 generation paths (Rule 1/2/3, batch structure, batch reform) updated
- _build_control_from_json: parsing + validation (string→list, size validation)
- _store_control: writes 3 new JSONB columns
- API: response models, create/update requests, SELECT queries extended
- Migration 063: 3 new JSONB columns with GIN indexes
- 110 generator tests + 28 route tests = 138 total, all passing

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-18 16:28:05 +01:00
parent 3bb9fffab6
commit f2819b99af
9 changed files with 685 additions and 139 deletions

View File

@@ -31,53 +31,69 @@ class TestLicenseMapping:
info = _classify_regulation("eu_2016_679")
assert info["rule"] == 1
assert info["name"] == "DSGVO"
assert info["source_type"] == "law"
def test_rule1_nist(self):
info = _classify_regulation("nist_sp_800_53")
assert info["rule"] == 1
assert "NIST" in info["name"]
assert info["source_type"] == "standard"
def test_rule1_german_law(self):
info = _classify_regulation("bdsg")
assert info["rule"] == 1
assert info["name"] == "BDSG"
assert info["source_type"] == "law"
def test_rule2_owasp(self):
info = _classify_regulation("owasp_asvs")
assert info["rule"] == 2
assert "OWASP" in info["name"]
assert "attribution" in info
assert info["source_type"] == "standard"
def test_rule2_enisa_prefix(self):
info = _classify_regulation("enisa_iot_security")
assert info["rule"] == 2
assert "ENISA" in info["name"]
assert info["source_type"] == "standard"
def test_rule3_bsi_prefix(self):
info = _classify_regulation("bsi_tr03161")
assert info["rule"] == 3
assert info["name"] == "INTERNAL_ONLY"
assert info["source_type"] == "restricted"
def test_rule3_iso_prefix(self):
info = _classify_regulation("iso_27001")
assert info["rule"] == 3
assert info["source_type"] == "restricted"
def test_rule3_etsi_prefix(self):
info = _classify_regulation("etsi_en_303_645")
assert info["rule"] == 3
assert info["source_type"] == "restricted"
def test_unknown_defaults_to_rule3(self):
info = _classify_regulation("some_unknown_source")
assert info["rule"] == 3
assert info["source_type"] == "restricted"
def test_case_insensitive(self):
info = _classify_regulation("EU_2016_679")
assert info["rule"] == 1
assert info["source_type"] == "law"
def test_all_mapped_regulations_have_valid_rules(self):
for code, info in REGULATION_LICENSE_MAP.items():
assert info["rule"] in (1, 2, 3), f"{code} has invalid rule {info['rule']}"
def test_all_mapped_regulations_have_source_type(self):
valid_types = {"law", "guideline", "standard", "restricted"}
for code, info in REGULATION_LICENSE_MAP.items():
assert "source_type" in info, f"{code} missing source_type"
assert info["source_type"] in valid_types, f"{code} has invalid source_type {info['source_type']}"
def test_rule3_never_exposes_names(self):
for prefix in ["bsi_test", "iso_test", "etsi_test"]:
info = _classify_regulation(prefix)
@@ -1125,8 +1141,8 @@ class TestRegulationFilter:
class TestPipelineVersion:
"""Tests for pipeline_version propagation in DB writes and null handling."""
def test_pipeline_version_constant_is_2(self):
assert PIPELINE_VERSION == 2
def test_pipeline_version_constant_is_3(self):
assert PIPELINE_VERSION == 3
def test_store_control_includes_pipeline_version(self):
"""_store_control must pass pipeline_version=PIPELINE_VERSION to the INSERT."""
@@ -1396,3 +1412,259 @@ class TestRecitalDetection:
assert result is not None
assert "126" in result["recital_numbers"]
assert "127" in result["recital_numbers"]
# =============================================================================
# Source Type Classification Tests
# =============================================================================
class TestSourceTypeClassification:
"""Tests that source_type correctly distinguishes law vs guideline vs standard vs restricted."""
def test_eu_regulations_are_law(self):
"""All EU regulations (Verordnungen/Richtlinien) must be classified as 'law'."""
eu_laws = ["eu_2016_679", "eu_2024_1689", "eu_2022_2555", "eu_2024_2847",
"eucsa", "dataact", "dora", "eaa"]
for code in eu_laws:
info = _classify_regulation(code)
assert info["source_type"] == "law", f"{code} should be law, got {info['source_type']}"
def test_german_laws_are_law(self):
"""German national laws must be classified as 'law'."""
de_laws = ["bdsg", "ttdsg", "tkg", "bgb_komplett", "hgb", "gewo"]
for code in de_laws:
info = _classify_regulation(code)
assert info["source_type"] == "law", f"{code} should be law, got {info['source_type']}"
def test_austrian_laws_are_law(self):
"""Austrian laws must be classified as 'law'."""
at_laws = ["at_dsg", "at_abgb", "at_ecg", "at_tkg"]
for code in at_laws:
info = _classify_regulation(code)
assert info["source_type"] == "law", f"{code} should be law, got {info['source_type']}"
def test_nist_is_standard_not_law(self):
"""NIST frameworks are US standards, NOT EU law — must be 'standard'."""
nist_codes = ["nist_sp_800_53", "nist_csf_2_0", "nist_ai_rmf", "nistir_8259a"]
for code in nist_codes:
info = _classify_regulation(code)
assert info["source_type"] == "standard", f"{code} should be standard, got {info['source_type']}"
def test_cisa_is_standard(self):
info = _classify_regulation("cisa_secure_by_design")
assert info["source_type"] == "standard"
def test_owasp_is_standard(self):
"""OWASP frameworks are voluntary standards, not law."""
owasp_codes = ["owasp_asvs", "owasp_top10", "owasp_samm"]
for code in owasp_codes:
info = _classify_regulation(code)
assert info["source_type"] == "standard", f"{code} should be standard, got {info['source_type']}"
def test_enisa_prefix_is_standard(self):
info = _classify_regulation("enisa_threat_landscape")
assert info["source_type"] == "standard"
def test_oecd_is_standard(self):
info = _classify_regulation("oecd_ai_principles")
assert info["source_type"] == "standard"
def test_edpb_is_guideline(self):
"""EDPB guidelines are authoritative but non-binding soft law."""
edpb_codes = ["edpb_01_2020", "edpb_dpbd_04_2019", "edpb_legitimate_interest"]
for code in edpb_codes:
info = _classify_regulation(code)
assert info["source_type"] == "guideline", f"{code} should be guideline, got {info['source_type']}"
def test_wp29_is_guideline(self):
"""WP29 (pre-EDPB) guidelines are soft law."""
for code in ["wp244_profiling", "wp260_transparency"]:
info = _classify_regulation(code)
assert info["source_type"] == "guideline", f"{code} should be guideline, got {info['source_type']}"
def test_blue_guide_is_guideline(self):
info = _classify_regulation("eu_blue_guide_2022")
assert info["source_type"] == "guideline"
def test_bsi_is_restricted(self):
info = _classify_regulation("bsi_grundschutz")
assert info["source_type"] == "restricted"
def test_iso_is_restricted(self):
info = _classify_regulation("iso_27001")
assert info["source_type"] == "restricted"
def test_etsi_is_restricted(self):
info = _classify_regulation("etsi_en_303_645")
assert info["source_type"] == "restricted"
def test_unknown_is_restricted(self):
info = _classify_regulation("totally_unknown")
assert info["source_type"] == "restricted"
def test_source_type_and_license_rule_are_independent(self):
"""source_type classifies legal authority; license_rule classifies copyright.
NIST is Rule 1 (public domain, free use) but source_type='standard' (not a law)."""
nist = _classify_regulation("nist_sp_800_53")
assert nist["rule"] == 1 # free use (copyright)
assert nist["source_type"] == "standard" # NOT law (legal authority)
edpb = _classify_regulation("edpb_01_2020")
assert edpb["rule"] == 1 # free use (public authority)
assert edpb["source_type"] == "guideline" # NOT law (soft law)
# =============================================================================
# Scoped Control Applicability Tests (v3 Pipeline)
# =============================================================================
class TestApplicabilityFields:
"""Tests for applicable_industries, applicable_company_size, scope_conditions parsing."""
def _make_pipeline(self):
"""Create a pipeline with mocked DB."""
db = MagicMock()
pipeline = ControlGeneratorPipeline(db=db, rag_client=MagicMock())
return pipeline
def test_all_industries_parsed(self):
pipeline = self._make_pipeline()
data = {
"title": "Test",
"objective": "Test objective",
"applicable_industries": ["all"],
"applicable_company_size": ["all"],
"scope_conditions": None,
}
control = pipeline._build_control_from_json(data, "SEC")
assert control.applicable_industries == ["all"]
assert control.applicable_company_size == ["all"]
assert control.scope_conditions is None
def test_specific_industries_parsed(self):
pipeline = self._make_pipeline()
data = {
"title": "TKG Control",
"objective": "Telekommunikation",
"applicable_industries": ["Telekommunikation", "Energie"],
"applicable_company_size": ["medium", "large", "enterprise"],
"scope_conditions": None,
}
control = pipeline._build_control_from_json(data, "INC")
assert control.applicable_industries == ["Telekommunikation", "Energie"]
assert control.applicable_company_size == ["medium", "large", "enterprise"]
def test_scope_conditions_parsed(self):
pipeline = self._make_pipeline()
data = {
"title": "AI Act Control",
"objective": "KI-Risikomanagement",
"applicable_industries": ["all"],
"applicable_company_size": ["all"],
"scope_conditions": {
"requires_any": ["uses_ai"],
"description": "Nur bei KI-Einsatz relevant",
},
}
control = pipeline._build_control_from_json(data, "AI")
assert control.scope_conditions is not None
assert control.scope_conditions["requires_any"] == ["uses_ai"]
assert "KI" in control.scope_conditions["description"]
def test_missing_applicability_fields_are_none(self):
"""Old-style LLM response without applicability fields."""
pipeline = self._make_pipeline()
data = {
"title": "Legacy Control",
"objective": "Test",
}
control = pipeline._build_control_from_json(data, "SEC")
assert control.applicable_industries is None
assert control.applicable_company_size is None
assert control.scope_conditions is None
def test_string_industry_converted_to_list(self):
"""LLM sometimes returns a string instead of list."""
pipeline = self._make_pipeline()
data = {
"title": "Test",
"objective": "Test",
"applicable_industries": "Telekommunikation",
"applicable_company_size": "all",
}
control = pipeline._build_control_from_json(data, "SEC")
assert control.applicable_industries == ["Telekommunikation"]
assert control.applicable_company_size == ["all"]
def test_invalid_company_size_filtered(self):
"""Invalid size values should be filtered out."""
pipeline = self._make_pipeline()
data = {
"title": "Test",
"objective": "Test",
"applicable_company_size": ["medium", "huge", "large"],
}
control = pipeline._build_control_from_json(data, "SEC")
assert control.applicable_company_size == ["medium", "large"]
def test_all_invalid_sizes_results_in_none(self):
pipeline = self._make_pipeline()
data = {
"title": "Test",
"objective": "Test",
"applicable_company_size": ["huge", "tiny"],
}
control = pipeline._build_control_from_json(data, "SEC")
assert control.applicable_company_size is None
def test_scope_conditions_non_dict_ignored(self):
"""If LLM returns a string for scope_conditions, ignore it."""
pipeline = self._make_pipeline()
data = {
"title": "Test",
"objective": "Test",
"scope_conditions": "uses_ai",
}
control = pipeline._build_control_from_json(data, "SEC")
assert control.scope_conditions is None
def test_multiple_scope_signals(self):
pipeline = self._make_pipeline()
data = {
"title": "EHDS Control",
"objective": "Gesundheitsdaten",
"applicable_industries": ["Gesundheitswesen", "Pharma"],
"applicable_company_size": ["all"],
"scope_conditions": {
"requires_any": ["processes_health_data", "uses_ai"],
"description": "Gesundheitsdaten mit KI-Verarbeitung",
},
}
control = pipeline._build_control_from_json(data, "HLT")
assert len(control.scope_conditions["requires_any"]) == 2
assert "processes_health_data" in control.scope_conditions["requires_any"]
def test_pipeline_version_is_3(self):
"""v3 pipeline includes applicability fields."""
assert PIPELINE_VERSION == 3
def test_generated_control_dataclass_has_fields(self):
"""Verify the dataclass has the new fields with correct defaults."""
ctrl = GeneratedControl()
assert ctrl.applicable_industries is None
assert ctrl.applicable_company_size is None
assert ctrl.scope_conditions is None
def test_applicability_in_generation_metadata_not_leaked(self):
"""Applicability fields should be top-level, not in generation_metadata."""
pipeline = self._make_pipeline()
data = {
"title": "Test",
"objective": "Test",
"applicable_industries": ["all"],
"applicable_company_size": ["all"],
"scope_conditions": None,
}
control = pipeline._build_control_from_json(data, "SEC")
assert "applicable_industries" not in control.generation_metadata
assert "applicable_company_size" not in control.generation_metadata