feat: Dreistufenmodell normative Verbindlichkeit + Duplikat-Filter + Auto-Deploy

- Source-Type-Klassifikation (58 Regulierungen: law/guideline/framework)
- Backfill-Endpoint POST /controls/backfill-normative-strength
- exclude_duplicates Filter fuer Control-Library (Backend + Proxy + UI-Toggle)
- MkDocs-Kapitel: Normative Verbindlichkeit mit Mermaid-Diagrammen
- scripts/deploy.sh: Auto-Push + Mac Mini rebuild + Coolify health monitoring
- 26 Unit Tests fuer Klassifikations-Logik

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-25 08:18:00 +01:00
parent 6d3bdf8e74
commit 230fbeb490
8 changed files with 796 additions and 4 deletions

View File

@@ -316,6 +316,7 @@ async def list_controls(
source: Optional[str] = Query(None, description="Filter by source_citation->source"),
search: Optional[str] = Query(None, description="Full-text search in control_id, title, objective"),
control_type: Optional[str] = Query(None, description="Filter: atomic, rich, or all"),
exclude_duplicates: bool = Query(False, description="Exclude controls with release_state='duplicate'"),
sort: Optional[str] = Query("control_id", description="Sort field: control_id, created_at, severity"),
order: Optional[str] = Query("asc", description="Sort order: asc or desc"),
limit: Optional[int] = Query(None, ge=1, le=5000, description="Max results"),
@@ -329,6 +330,9 @@ async def list_controls(
"""
params: dict[str, Any] = {}
if exclude_duplicates:
query += " AND release_state != 'duplicate'"
if severity:
query += " AND severity = :sev"
params["sev"] = severity
@@ -398,11 +402,15 @@ async def count_controls(
source: Optional[str] = Query(None),
search: Optional[str] = Query(None),
control_type: Optional[str] = Query(None),
exclude_duplicates: bool = Query(False, description="Exclude controls with release_state='duplicate'"),
):
"""Count controls matching filters (for pagination)."""
query = "SELECT count(*) FROM canonical_controls WHERE 1=1"
params: dict[str, Any] = {}
if exclude_duplicates:
query += " AND release_state != 'duplicate'"
if severity:
query += " AND severity = :sev"
params["sev"] = severity
@@ -908,6 +916,107 @@ async def get_control_provenance(control_id: str):
return result
# =============================================================================
# NORMATIVE STRENGTH BACKFILL
# =============================================================================
@router.post("/controls/backfill-normative-strength")
async def backfill_normative_strength(
dry_run: bool = Query(True, description="Nur zaehlen, nicht aendern"),
):
"""
Korrigiert normative_strength auf obligation_candidates basierend auf
dem source_type der Quell-Regulierung.
Dreistufiges Modell:
- law (Gesetz): normative_strength bleibt unveraendert
- guideline (Leitlinie): max 'should'
- framework (Framework): max 'can'
Fuer Controls mit mehreren Parent-Links gilt der hoechste source_type.
"""
from compliance.data.source_type_classification import (
classify_source_regulation,
get_highest_source_type,
cap_normative_strength,
)
with SessionLocal() as db:
# 1. Alle Obligations mit ihren Parent-Control-Links laden
obligations = db.execute(text("""
SELECT oc.id, oc.candidate_id, oc.normative_strength,
oc.parent_control_uuid
FROM obligation_candidates oc
WHERE oc.release_state NOT IN ('rejected', 'merged')
AND oc.normative_strength IS NOT NULL
ORDER BY oc.candidate_id
""")).fetchall()
# 2. Fuer jeden Parent Control die source_regulations sammeln
parent_uuids = list({str(o.parent_control_uuid) for o in obligations if o.parent_control_uuid})
source_types_by_parent: dict[str, list[str]] = {}
if parent_uuids:
# Batch-Query fuer alle Parent-Links
links = db.execute(text("""
SELECT control_uuid::text, source_regulation
FROM control_parent_links
WHERE control_uuid::text = ANY(:uuids)
"""), {"uuids": parent_uuids}).fetchall()
for link in links:
uid = link.control_uuid
src_type = classify_source_regulation(link.source_regulation or "")
source_types_by_parent.setdefault(uid, []).append(src_type)
# 3. Normative strength korrigieren
changes = []
stats = {"total": len(obligations), "unchanged": 0, "capped_to_should": 0, "capped_to_can": 0, "no_parent_links": 0}
for obl in obligations:
parent_uid = str(obl.parent_control_uuid) if obl.parent_control_uuid else None
source_types = source_types_by_parent.get(parent_uid, []) if parent_uid else []
if not source_types:
stats["no_parent_links"] += 1
continue
highest_type = get_highest_source_type(source_types)
new_strength = cap_normative_strength(obl.normative_strength, highest_type)
if new_strength != obl.normative_strength:
changes.append({
"id": str(obl.id),
"candidate_id": obl.candidate_id,
"old_strength": obl.normative_strength,
"new_strength": new_strength,
"source_type": highest_type,
})
if new_strength == "should":
stats["capped_to_should"] += 1
elif new_strength == "can":
stats["capped_to_can"] += 1
else:
stats["unchanged"] += 1
# 4. Aenderungen anwenden (wenn kein dry_run)
if not dry_run and changes:
for change in changes:
db.execute(text("""
UPDATE obligation_candidates
SET normative_strength = :new_strength
WHERE id = CAST(:oid AS uuid)
"""), {"new_strength": change["new_strength"], "oid": change["id"]})
db.commit()
return {
"dry_run": dry_run,
"stats": stats,
"total_changes": len(changes),
"sample_changes": changes[:20],
}
# =============================================================================
# CONTROL CRUD (CREATE / UPDATE / DELETE)
# =============================================================================

View File

@@ -0,0 +1,204 @@
"""
Source-Type-Klassifikation fuer Regulierungen und Frameworks.
Dreistufiges Modell der normativen Verbindlichkeit:
Stufe 1 — GESETZ (law):
Rechtlich bindend. Bussgeld bei Verstoss.
Beispiele: DSGVO, NIS2, AI Act, CRA
Stufe 2 — LEITLINIE (guideline):
Offizielle Auslegungshilfe von Aufsichtsbehoerden.
Beweislastumkehr: Wer abweicht, muss begruenden warum.
Beispiele: EDPB-Leitlinien, BSI-Standards, WP29-Dokumente
Stufe 3 — FRAMEWORK (framework):
Freiwillige Best Practices, nicht rechtsverbindlich.
Aber: Koennen als "Stand der Technik" herangezogen werden.
Beispiele: ENISA, NIST, OWASP, OECD, CISA
Mapping: source_regulation (aus control_parent_links) -> source_type
"""
# --- Typ-Definitionen ---
SOURCE_TYPE_LAW = "law" # Gesetz/Verordnung/Richtlinie — normative_strength bleibt
SOURCE_TYPE_GUIDELINE = "guideline" # Leitlinie/Standard — max "should"
SOURCE_TYPE_FRAMEWORK = "framework" # Framework/Best Practice — max "can"
# Max erlaubte normative_strength pro source_type
NORMATIVE_STRENGTH_CAP: dict[str, str] = {
SOURCE_TYPE_LAW: "must", # keine Begrenzung
SOURCE_TYPE_GUIDELINE: "should", # max "should"
SOURCE_TYPE_FRAMEWORK: "can", # max "can"
}
# Reihenfolge fuer Vergleiche (hoeher = staerker)
STRENGTH_ORDER: dict[str, int] = {
"can": 1,
"may": 1, # Alias fuer "can"
"should": 2,
"must": 3,
}
def cap_normative_strength(original: str, source_type: str) -> str:
"""
Begrenzt die normative_strength basierend auf dem source_type.
Beispiel:
cap_normative_strength("must", "framework") -> "can"
cap_normative_strength("should", "law") -> "should"
cap_normative_strength("must", "guideline") -> "should"
"""
cap = NORMATIVE_STRENGTH_CAP.get(source_type, "must")
cap_level = STRENGTH_ORDER.get(cap, 3)
original_level = STRENGTH_ORDER.get(original, 3)
if original_level > cap_level:
return cap
return original
def get_highest_source_type(source_types: list[str]) -> str:
"""
Bestimmt den hoechsten source_type aus einer Liste.
Ein Gesetz uebertrumpft alles.
Beispiel:
get_highest_source_type(["framework", "law"]) -> "law"
get_highest_source_type(["framework", "guideline"]) -> "guideline"
"""
type_order = {SOURCE_TYPE_FRAMEWORK: 1, SOURCE_TYPE_GUIDELINE: 2, SOURCE_TYPE_LAW: 3}
if not source_types:
return SOURCE_TYPE_FRAMEWORK
return max(source_types, key=lambda t: type_order.get(t, 0))
# ============================================================================
# Klassifikation: source_regulation -> source_type
#
# Diese Map wird fuer den Backfill und zukuenftige Pipeline-Runs verwendet.
# Neue Regulierungen hier eintragen!
# ============================================================================
SOURCE_REGULATION_CLASSIFICATION: dict[str, str] = {
# --- EU-Verordnungen (unmittelbar bindend) ---
"DSGVO (EU) 2016/679": SOURCE_TYPE_LAW,
"KI-Verordnung (EU) 2024/1689": SOURCE_TYPE_LAW,
"Cyber Resilience Act (CRA)": SOURCE_TYPE_LAW,
"NIS2-Richtlinie (EU) 2022/2555": SOURCE_TYPE_LAW,
"Data Act": SOURCE_TYPE_LAW,
"Data Governance Act (DGA)": SOURCE_TYPE_LAW,
"Markets in Crypto-Assets (MiCA)": SOURCE_TYPE_LAW,
"Maschinenverordnung (EU) 2023/1230": SOURCE_TYPE_LAW,
"Batterieverordnung (EU) 2023/1542": SOURCE_TYPE_LAW,
"AML-Verordnung": SOURCE_TYPE_LAW,
# --- EU-Richtlinien (nach nationaler Umsetzung bindend) ---
# Fuer Compliance-Zwecke wie Gesetze behandeln
# --- Nationale Gesetze ---
"Bundesdatenschutzgesetz (BDSG)": SOURCE_TYPE_LAW,
"Telekommunikationsgesetz": SOURCE_TYPE_LAW,
"Telekommunikationsgesetz Oesterreich": SOURCE_TYPE_LAW,
"Gewerbeordnung (GewO)": SOURCE_TYPE_LAW,
"Handelsgesetzbuch (HGB)": SOURCE_TYPE_LAW,
"Abgabenordnung (AO)": SOURCE_TYPE_LAW,
"IFRS-Übernahmeverordnung": SOURCE_TYPE_LAW,
"Österreichisches Datenschutzgesetz (DSG)": SOURCE_TYPE_LAW,
"LOPDGDD - Ley Orgánica de Protección de Datos (Spanien)": SOURCE_TYPE_LAW,
"Loi Informatique et Libertés (Frankreich)": SOURCE_TYPE_LAW,
"Információs önrendelkezési jog törvény (Ungarn)": SOURCE_TYPE_LAW,
"EU Blue Guide 2022": SOURCE_TYPE_LAW,
# --- EDPB/WP29 Leitlinien (offizielle Auslegungshilfe) ---
"EDPB Leitlinien 01/2019 (Zertifizierung)": SOURCE_TYPE_GUIDELINE,
"EDPB Leitlinien 01/2020 (Datentransfers)": SOURCE_TYPE_GUIDELINE,
"EDPB Leitlinien 01/2020 (Vernetzte Fahrzeuge)": SOURCE_TYPE_GUIDELINE,
"EDPB Leitlinien 01/2022 (BCR)": SOURCE_TYPE_GUIDELINE,
"EDPB Leitlinien 01/2024 (Berechtigtes Interesse)": SOURCE_TYPE_GUIDELINE,
"EDPB Leitlinien 04/2019 (Data Protection by Design)": SOURCE_TYPE_GUIDELINE,
"EDPB Leitlinien 05/2020 - Einwilligung": SOURCE_TYPE_GUIDELINE,
"EDPB Leitlinien 07/2020 (Datentransfers)": SOURCE_TYPE_GUIDELINE,
"EDPB Leitlinien 08/2020 (Social Media)": SOURCE_TYPE_GUIDELINE,
"EDPB Leitlinien 09/2022 (Data Breach)": SOURCE_TYPE_GUIDELINE,
"EDPB Leitlinien 09/2022 - Meldung von Datenschutzverletzungen": SOURCE_TYPE_GUIDELINE,
"EDPB Empfehlungen 01/2020 - Ergaenzende Massnahmen fuer Datentransfers": SOURCE_TYPE_GUIDELINE,
"EDPB Leitlinien - Berechtigtes Interesse (Art. 6(1)(f))": SOURCE_TYPE_GUIDELINE,
"WP244 Leitlinien (Profiling)": SOURCE_TYPE_GUIDELINE,
"WP251 Leitlinien (Profiling)": SOURCE_TYPE_GUIDELINE,
"WP260 Leitlinien (Transparenz)": SOURCE_TYPE_GUIDELINE,
# --- BSI Standards (behoerdliche technische Richtlinien) ---
"BSI-TR-03161-1": SOURCE_TYPE_GUIDELINE,
"BSI-TR-03161-2": SOURCE_TYPE_GUIDELINE,
"BSI-TR-03161-3": SOURCE_TYPE_GUIDELINE,
# --- ENISA (EU-Agentur, aber Empfehlungen nicht rechtsverbindlich) ---
"ENISA Cybersecurity State 2024": SOURCE_TYPE_FRAMEWORK,
"ENISA ICS/SCADA Dependencies": SOURCE_TYPE_FRAMEWORK,
"ENISA Supply Chain Good Practices": SOURCE_TYPE_FRAMEWORK,
"ENISA Threat Landscape Supply Chain": SOURCE_TYPE_FRAMEWORK,
# --- NIST (US-Standards, international als Best Practice) ---
"NIST AI Risk Management Framework": SOURCE_TYPE_FRAMEWORK,
"NIST Cybersecurity Framework 2.0": SOURCE_TYPE_FRAMEWORK,
"NIST SP 800-207 (Zero Trust)": SOURCE_TYPE_FRAMEWORK,
"NIST SP 800-218 (SSDF)": SOURCE_TYPE_FRAMEWORK,
"NIST SP 800-53 Rev. 5": SOURCE_TYPE_FRAMEWORK,
"NIST SP 800-63-3": SOURCE_TYPE_FRAMEWORK,
# --- OWASP (Community-Standards) ---
"OWASP API Security Top 10 (2023)": SOURCE_TYPE_FRAMEWORK,
"OWASP ASVS 4.0": SOURCE_TYPE_FRAMEWORK,
"OWASP MASVS 2.0": SOURCE_TYPE_FRAMEWORK,
"OWASP SAMM 2.0": SOURCE_TYPE_FRAMEWORK,
"OWASP Top 10 (2021)": SOURCE_TYPE_FRAMEWORK,
# --- Sonstige Frameworks ---
"OECD KI-Empfehlung": SOURCE_TYPE_FRAMEWORK,
"CISA Secure by Design": SOURCE_TYPE_FRAMEWORK,
}
def classify_source_regulation(source_regulation: str) -> str:
"""
Klassifiziert eine source_regulation als law, guideline oder framework.
Verwendet exaktes Matching gegen die Map. Bei unbekannten Quellen
wird anhand von Schluesselwoertern geraten, Fallback ist 'framework'
(konservativstes Ergebnis).
"""
if not source_regulation:
return SOURCE_TYPE_FRAMEWORK
# Exaktes Match
if source_regulation in SOURCE_REGULATION_CLASSIFICATION:
return SOURCE_REGULATION_CLASSIFICATION[source_regulation]
# Heuristik fuer unbekannte Quellen
lower = source_regulation.lower()
# Gesetze erkennen
law_indicators = [
"verordnung", "richtlinie", "gesetz", "directive", "regulation",
"(eu)", "(eg)", "act", "ley", "loi", "törvény", "código",
]
if any(ind in lower for ind in law_indicators):
return SOURCE_TYPE_LAW
# Leitlinien erkennen
guideline_indicators = [
"edpb", "leitlinie", "guideline", "wp2", "bsi", "empfehlung",
]
if any(ind in lower for ind in guideline_indicators):
return SOURCE_TYPE_GUIDELINE
# Frameworks erkennen
framework_indicators = [
"enisa", "nist", "owasp", "oecd", "cisa", "framework", "iso",
]
if any(ind in lower for ind in framework_indicators):
return SOURCE_TYPE_FRAMEWORK
# Konservativ: unbekannt = framework (geringste Verbindlichkeit)
return SOURCE_TYPE_FRAMEWORK

View File

@@ -0,0 +1,102 @@
"""Tests for source_type_classification module."""
import sys
sys.path.insert(0, ".")
from compliance.data.source_type_classification import (
classify_source_regulation,
cap_normative_strength,
get_highest_source_type,
SOURCE_TYPE_LAW,
SOURCE_TYPE_GUIDELINE,
SOURCE_TYPE_FRAMEWORK,
)
class TestClassifySourceRegulation:
"""Tests for classify_source_regulation()."""
def test_eu_regulation(self):
assert classify_source_regulation("DSGVO (EU) 2016/679") == SOURCE_TYPE_LAW
def test_eu_directive(self):
assert classify_source_regulation("NIS2-Richtlinie (EU) 2022/2555") == SOURCE_TYPE_LAW
def test_national_law(self):
assert classify_source_regulation("Bundesdatenschutzgesetz (BDSG)") == SOURCE_TYPE_LAW
def test_edpb_guideline(self):
assert classify_source_regulation("EDPB Leitlinien 01/2020 (Datentransfers)") == SOURCE_TYPE_GUIDELINE
def test_bsi_standard(self):
assert classify_source_regulation("BSI-TR-03161-1") == SOURCE_TYPE_GUIDELINE
def test_wp29_guideline(self):
assert classify_source_regulation("WP260 Leitlinien (Transparenz)") == SOURCE_TYPE_GUIDELINE
def test_enisa_framework(self):
assert classify_source_regulation("ENISA Supply Chain Good Practices") == SOURCE_TYPE_FRAMEWORK
def test_nist_framework(self):
assert classify_source_regulation("NIST Cybersecurity Framework 2.0") == SOURCE_TYPE_FRAMEWORK
def test_owasp_framework(self):
assert classify_source_regulation("OWASP Top 10 (2021)") == SOURCE_TYPE_FRAMEWORK
def test_unknown_defaults_to_framework(self):
assert classify_source_regulation("Some Unknown Source") == SOURCE_TYPE_FRAMEWORK
def test_empty_string(self):
assert classify_source_regulation("") == SOURCE_TYPE_FRAMEWORK
def test_heuristic_verordnung(self):
assert classify_source_regulation("Neue Verordnung 2027") == SOURCE_TYPE_LAW
def test_heuristic_nist(self):
assert classify_source_regulation("NIST Future Standard") == SOURCE_TYPE_FRAMEWORK
class TestCapNormativeStrength:
"""Tests for cap_normative_strength()."""
def test_must_from_law_stays(self):
assert cap_normative_strength("must", SOURCE_TYPE_LAW) == "must"
def test_should_from_law_stays(self):
assert cap_normative_strength("should", SOURCE_TYPE_LAW) == "should"
def test_must_from_guideline_capped(self):
assert cap_normative_strength("must", SOURCE_TYPE_GUIDELINE) == "should"
def test_should_from_guideline_stays(self):
assert cap_normative_strength("should", SOURCE_TYPE_GUIDELINE) == "should"
def test_must_from_framework_capped(self):
assert cap_normative_strength("must", SOURCE_TYPE_FRAMEWORK) == "can"
def test_should_from_framework_capped(self):
assert cap_normative_strength("should", SOURCE_TYPE_FRAMEWORK) == "can"
def test_can_from_framework_stays(self):
assert cap_normative_strength("can", SOURCE_TYPE_FRAMEWORK) == "can"
def test_can_from_law_stays(self):
assert cap_normative_strength("can", SOURCE_TYPE_LAW) == "can"
class TestGetHighestSourceType:
"""Tests for get_highest_source_type()."""
def test_law_wins(self):
assert get_highest_source_type(["framework", "law"]) == "law"
def test_guideline_over_framework(self):
assert get_highest_source_type(["framework", "guideline"]) == "guideline"
def test_single_framework(self):
assert get_highest_source_type(["framework"]) == "framework"
def test_empty_defaults_to_framework(self):
assert get_highest_source_type([]) == "framework"
def test_all_three(self):
assert get_highest_source_type(["framework", "guideline", "law"]) == "law"