feat(control-pipeline): replace similarity-only dedup with LLM-verified dedup in pipeline
Stage 4 (Harmonization) now uses two-tier approach: - Score >= 0.92: auto-duplicate (embedding only, fast) - Score 0.85-0.92: LLM verification via local qwen3.5 (think=false, ~3s) - Score < 0.85: not a duplicate This eliminates ~44% false positives from pure embedding similarity. LLM_DEDUP_ENABLED env var controls the feature (default: true). Also adds 10 applicability use case tests (bank+TAN, webshop+Stripe, SaaS startup, energy provider, health app, automotive, law firm, etc.) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -51,7 +51,9 @@ OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
|
|||||||
OLLAMA_MODEL = os.getenv("CONTROL_GEN_OLLAMA_MODEL", "qwen3.5:35b-a3b")
|
OLLAMA_MODEL = os.getenv("CONTROL_GEN_OLLAMA_MODEL", "qwen3.5:35b-a3b")
|
||||||
LLM_TIMEOUT = float(os.getenv("CONTROL_GEN_LLM_TIMEOUT", "180"))
|
LLM_TIMEOUT = float(os.getenv("CONTROL_GEN_LLM_TIMEOUT", "180"))
|
||||||
|
|
||||||
HARMONIZATION_THRESHOLD = 0.85 # Cosine similarity above this = duplicate
|
HARMONIZATION_THRESHOLD = 0.85 # Cosine similarity above this = candidate for dedup
|
||||||
|
HARMONIZATION_AUTO_DUP = 0.92 # Above this = auto-duplicate (no LLM check needed)
|
||||||
|
LLM_DEDUP_ENABLED = os.getenv("LLM_DEDUP_ENABLED", "true").lower() == "true"
|
||||||
|
|
||||||
# Pipeline version — increment when generation rules change materially.
|
# Pipeline version — increment when generation rules change materially.
|
||||||
# v1: Original (local LLM prefilter, old prompt)
|
# v1: Original (local LLM prefilter, old prompt)
|
||||||
@@ -1589,10 +1591,13 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Elementen. Fuer Aspekte ohne
|
|||||||
# ── Stage 4: Harmonization ─────────────────────────────────────────
|
# ── Stage 4: Harmonization ─────────────────────────────────────────
|
||||||
|
|
||||||
async def _check_harmonization(self, new_control: GeneratedControl) -> Optional[list]:
|
async def _check_harmonization(self, new_control: GeneratedControl) -> Optional[list]:
|
||||||
"""Check if a new control duplicates existing ones via Qdrant vector search.
|
"""Check if a new control duplicates existing ones.
|
||||||
|
|
||||||
Uses the atomic_controls_dedup collection for fast nearest-neighbor lookup
|
Two-tier approach:
|
||||||
instead of pre-loading all embeddings into memory.
|
1. Fast: Qdrant embedding similarity (pre-filter)
|
||||||
|
2. Precise: Local LLM verification for borderline matches (0.85-0.92)
|
||||||
|
|
||||||
|
Returns list of similar controls if duplicate, None otherwise.
|
||||||
"""
|
"""
|
||||||
new_text = f"{new_control.title} {new_control.objective}"
|
new_text = f"{new_control.title} {new_control.objective}"
|
||||||
new_emb = await _get_embedding(new_text)
|
new_emb = await _get_embedding(new_text)
|
||||||
@@ -1610,22 +1615,90 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Elementen. Fuer Aspekte ohne
|
|||||||
"with_payload": {"include": ["control_id", "title"]},
|
"with_payload": {"include": ["control_id", "title"]},
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if resp.status_code == 200:
|
if resp.status_code != 200:
|
||||||
|
return None
|
||||||
|
|
||||||
results = resp.json().get("result", [])
|
results = resp.json().get("result", [])
|
||||||
if results:
|
if not results:
|
||||||
return [
|
return None
|
||||||
{
|
|
||||||
"control_id": r["payload"].get("control_id", ""),
|
best = results[0]
|
||||||
"title": r["payload"].get("title", ""),
|
best_score = best.get("score", 0.0)
|
||||||
"similarity": round(r["score"], 3),
|
best_id = best["payload"].get("control_id", "")
|
||||||
}
|
best_title = best["payload"].get("title", "")
|
||||||
for r in results
|
|
||||||
]
|
# Tier 1: High similarity → auto-duplicate
|
||||||
|
if best_score >= HARMONIZATION_AUTO_DUP:
|
||||||
|
return [{"control_id": best_id, "title": best_title,
|
||||||
|
"similarity": round(best_score, 3), "method": "embedding_auto"}]
|
||||||
|
|
||||||
|
# Tier 2: Borderline → LLM verification
|
||||||
|
if LLM_DEDUP_ENABLED and best_score >= HARMONIZATION_THRESHOLD:
|
||||||
|
is_dup = await self._llm_verify_duplicate(
|
||||||
|
new_control.title, new_control.objective or "",
|
||||||
|
best_title, "",
|
||||||
|
)
|
||||||
|
if is_dup:
|
||||||
|
return [{"control_id": best_id, "title": best_title,
|
||||||
|
"similarity": round(best_score, 3), "method": "llm_verified"}]
|
||||||
|
# LLM says different → not a duplicate
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Below threshold but returned by Qdrant → not a duplicate
|
||||||
|
return None
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Qdrant dedup search failed: %s — skipping harmonization", e)
|
logger.warning("Harmonization check failed: %s — skipping", e)
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
async def _llm_verify_duplicate(
|
||||||
|
self, title_a: str, obj_a: str, title_b: str, obj_b: str,
|
||||||
|
) -> bool:
|
||||||
|
"""Ask local LLM whether two controls are duplicates.
|
||||||
|
|
||||||
|
Returns True if the LLM classifies them as DUPLIKAT.
|
||||||
|
Uses qwen3.5 with think=false for fast (~3s) responses.
|
||||||
|
"""
|
||||||
|
prompt = (
|
||||||
|
f"Control A:\n{title_a}\n{obj_a[:300]}\n\n"
|
||||||
|
f"Control B:\n{title_b}\n{obj_b[:300]}\n\n"
|
||||||
|
f"Sind diese Controls Duplikate?"
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
||||||
|
resp = await client.post(
|
||||||
|
f"{OLLAMA_URL}/api/chat",
|
||||||
|
json={
|
||||||
|
"model": OLLAMA_MODEL,
|
||||||
|
"stream": False,
|
||||||
|
"think": False,
|
||||||
|
"options": {"num_predict": 200},
|
||||||
|
"messages": [
|
||||||
|
{"role": "system", "content": (
|
||||||
|
"Du bist ein Compliance-Experte. Vergleiche zwei Controls: "
|
||||||
|
"DUPLIKAT (gleiche Anforderung, nur anders formuliert) oder "
|
||||||
|
"VERSCHIEDEN (unterschiedlicher Scope/Inhalt). "
|
||||||
|
"Antworte NUR mit JSON: {\"verdict\": \"DUPLIKAT\" oder \"VERSCHIEDEN\", "
|
||||||
|
"\"reason\": \"kurze Begruendung\"}"
|
||||||
|
)},
|
||||||
|
{"role": "user", "content": prompt},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if resp.status_code != 200:
|
||||||
|
return False
|
||||||
|
|
||||||
|
content = resp.json().get("message", {}).get("content", "")
|
||||||
|
parsed = _parse_llm_json(content)
|
||||||
|
if parsed and "DUPLIKAT" in str(parsed.get("verdict", "")).upper():
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("LLM dedup verification failed: %s", e)
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
async def _preload_embeddings(self, existing: list[dict]):
|
async def _preload_embeddings(self, existing: list[dict]):
|
||||||
"""Pre-load embeddings for all existing controls in batches."""
|
"""Pre-load embeddings for all existing controls in batches."""
|
||||||
texts = [f"{ex.get('title', '')} {ex.get('objective', '')}" for ex in existing]
|
texts = [f"{ex.get('title', '')} {ex.get('objective', '')}" for ex in existing]
|
||||||
|
|||||||
469
control-pipeline/tests/test_applicability_use_cases.py
Normal file
469
control-pipeline/tests/test_applicability_use_cases.py
Normal file
@@ -0,0 +1,469 @@
|
|||||||
|
"""
|
||||||
|
Applicability Use Case Tests — Real-world scenarios for control assignment.
|
||||||
|
|
||||||
|
These test cases verify that our Applicability Engine correctly assigns
|
||||||
|
and does NOT assign controls based on company profile + scope answers.
|
||||||
|
|
||||||
|
Each test case represents a real business scenario discussed during
|
||||||
|
product development. They serve as:
|
||||||
|
1. Regression tests for the Applicability Engine
|
||||||
|
2. Demo cases for the SDK
|
||||||
|
3. Documentation of regulatory nuances
|
||||||
|
|
||||||
|
Run: pytest tests/test_applicability_use_cases.py -v
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Test Case Data: Company Profiles + Expected Results
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
USE_CASES = [
|
||||||
|
# ===================================================================
|
||||||
|
# CASE 1: Bank mit TAN-Generator (Batterie im Produkt)
|
||||||
|
# ===================================================================
|
||||||
|
{
|
||||||
|
"id": "bank_tan_generator",
|
||||||
|
"name": "Bank gibt TAN-Generator mit Batterie an Kunden raus",
|
||||||
|
"company": {
|
||||||
|
"industry": "Finanzdienstleistungen",
|
||||||
|
"size": "large",
|
||||||
|
"scope_answers": {
|
||||||
|
"payment_services": True, # Bank ist Zahlungsdienstleister
|
||||||
|
"processes_health_data": False,
|
||||||
|
"uses_ai": False,
|
||||||
|
"third_country_transfer": True,
|
||||||
|
"manufactures_batteries": False, # Bank STELLT NICHT HER
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"must_match": [
|
||||||
|
"PSD2", # Bank IST Zahlungsdienstleister
|
||||||
|
"DSGVO", # Immer
|
||||||
|
"AML", # Bank hat AML-Pflichten
|
||||||
|
],
|
||||||
|
"must_not_match": [
|
||||||
|
"Batterieverordnung", # Bank stellt TAN-Generator nicht her
|
||||||
|
"Maschinenverordnung", # Kein Maschinenbau
|
||||||
|
"MDR", # Keine Medizinprodukte
|
||||||
|
],
|
||||||
|
"rationale": (
|
||||||
|
"Die Bank beschafft den TAN-Generator von einem Hersteller. "
|
||||||
|
"Der Hersteller unterliegt der Batterieverordnung, nicht die Bank. "
|
||||||
|
"Die Bank ist aber selbst PSD2-reguliert als Zahlungsdienstleister."
|
||||||
|
),
|
||||||
|
},
|
||||||
|
|
||||||
|
# ===================================================================
|
||||||
|
# CASE 2: Industrieunternehmen eroeffnet Webshop mit Stripe
|
||||||
|
# ===================================================================
|
||||||
|
{
|
||||||
|
"id": "industrie_webshop_stripe",
|
||||||
|
"name": "Maschinenbau-Firma eroeffnet Webshop mit Stripe-Zahlung",
|
||||||
|
"company": {
|
||||||
|
"industry": "Maschinenbau",
|
||||||
|
"size": "medium",
|
||||||
|
"scope_answers": {
|
||||||
|
"payment_services": False, # Stripe ist der Zahlungsdienstleister!
|
||||||
|
"uses_ai": False,
|
||||||
|
"third_country_transfer": True, # Stripe ist US-Unternehmen
|
||||||
|
"processes_health_data": False,
|
||||||
|
"has_webshop": True,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"must_match": [
|
||||||
|
"DSGVO", # Immer
|
||||||
|
"DSGVO_AV_Vertrag", # Stripe als Auftragsverarbeiter
|
||||||
|
"DSGVO_Datenschutzinfo", # Stripe in Datenschutzerklaerung nennen
|
||||||
|
"Maschinenverordnung", # Kerngeschaeft
|
||||||
|
"CE", # Maschinenbau braucht CE
|
||||||
|
],
|
||||||
|
"must_not_match": [
|
||||||
|
"PSD2", # Stripe ist der Zahlungsdienstleister, NICHT die Firma
|
||||||
|
"AML", # Keine eigene Zahlungsabwicklung
|
||||||
|
"BaFin", # Kein Finanzinstitut
|
||||||
|
],
|
||||||
|
"rationale": (
|
||||||
|
"Stripe ist Zahlungsdienstleister in eigenem Auftrag. Der Webshop-Betreiber "
|
||||||
|
"wird nicht zum regulierten Zahlungsinstitut. Er muss nur Stripe als "
|
||||||
|
"Auftragsverarbeiter in der Datenschutzinformation korrekt benennen (DSGVO Art. 13/14). "
|
||||||
|
"Rechtsanwaltlich begleitete Stripe-Anbindung hat das bestaetigt."
|
||||||
|
),
|
||||||
|
},
|
||||||
|
|
||||||
|
# ===================================================================
|
||||||
|
# CASE 3: Kleines SaaS-Startup (5 Personen)
|
||||||
|
# ===================================================================
|
||||||
|
{
|
||||||
|
"id": "saas_startup_klein",
|
||||||
|
"name": "5-Personen SaaS-Startup (Cloud-Software, keine KI)",
|
||||||
|
"company": {
|
||||||
|
"industry": "Technologie/IT",
|
||||||
|
"size": "micro",
|
||||||
|
"scope_answers": {
|
||||||
|
"uses_ai": False,
|
||||||
|
"third_country_transfer": False, # EU-only Hosting
|
||||||
|
"processes_health_data": False,
|
||||||
|
"automated_decisions": False,
|
||||||
|
"payment_services": False,
|
||||||
|
"is_kritis_operator": False,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"must_match": [
|
||||||
|
"DSGVO", # Immer
|
||||||
|
"OWASP", # Software-Sicherheit
|
||||||
|
],
|
||||||
|
"must_not_match": [
|
||||||
|
"NIS2", # Zu klein (NIS2 ab medium/50 MA)
|
||||||
|
"AI_Act", # Keine KI
|
||||||
|
"Batterieverordnung", # Kein Hardware-Produkt
|
||||||
|
"TKG", # Kein Telko-Anbieter
|
||||||
|
"MDR", # Keine Medizinprodukte
|
||||||
|
"PSD2", # Kein Zahlungsdienstleister
|
||||||
|
"KRITIS", # Zu klein, kein kritischer Sektor
|
||||||
|
],
|
||||||
|
"rationale": (
|
||||||
|
"Ein Kleinstunternehmen ohne KI, ohne KRITIS-Zugehoerigkeit, ohne "
|
||||||
|
"Drittlandtransfer braucht nur DSGVO-Basics und Software-Sicherheit. "
|
||||||
|
"NIS2 greift erst ab 50 Mitarbeitern / 10 Mio Umsatz."
|
||||||
|
),
|
||||||
|
},
|
||||||
|
|
||||||
|
# ===================================================================
|
||||||
|
# CASE 4: Mittelstaendischer Energieversorger
|
||||||
|
# ===================================================================
|
||||||
|
{
|
||||||
|
"id": "energieversorger_mittelstand",
|
||||||
|
"name": "Stadtwerk mit 200 Mitarbeitern (Strom + Gas)",
|
||||||
|
"company": {
|
||||||
|
"industry": "Energie",
|
||||||
|
"size": "medium",
|
||||||
|
"scope_answers": {
|
||||||
|
"is_kritis_operator": True,
|
||||||
|
"uses_ai": False,
|
||||||
|
"third_country_transfer": False,
|
||||||
|
"processes_health_data": False,
|
||||||
|
"employee_monitoring": True, # Leitwarte mit Kameras
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"must_match": [
|
||||||
|
"DSGVO",
|
||||||
|
"NIS2", # Energie = KRITIS-Sektor + medium
|
||||||
|
"KRITIS", # Energieversorger
|
||||||
|
"BDSG", # Mitarbeiterueberwachung
|
||||||
|
"BSI_Grundschutz", # KRITIS-Betreiber
|
||||||
|
],
|
||||||
|
"must_not_match": [
|
||||||
|
"PSD2",
|
||||||
|
"AI_Act",
|
||||||
|
"MDR",
|
||||||
|
"TKG",
|
||||||
|
"Batterieverordnung",
|
||||||
|
],
|
||||||
|
"rationale": (
|
||||||
|
"Stadtwerk ist KRITIS-Betreiber im Energiesektor. NIS2 greift ab medium "
|
||||||
|
"(50 MA). BSI-Grundschutz ist de-facto Pflicht fuer KRITIS. "
|
||||||
|
"Mitarbeiterueberwachung (Leitwarte) erfordert BDSG-Compliance."
|
||||||
|
),
|
||||||
|
},
|
||||||
|
|
||||||
|
# ===================================================================
|
||||||
|
# CASE 5: Gesundheits-App Startup mit KI
|
||||||
|
# ===================================================================
|
||||||
|
{
|
||||||
|
"id": "health_app_ki",
|
||||||
|
"name": "Startup entwickelt KI-basierte Gesundheits-App (DiGA)",
|
||||||
|
"company": {
|
||||||
|
"industry": "Gesundheitswesen",
|
||||||
|
"size": "small",
|
||||||
|
"scope_answers": {
|
||||||
|
"uses_ai": True,
|
||||||
|
"processes_health_data": True,
|
||||||
|
"automated_decisions": True,
|
||||||
|
"third_country_transfer": False,
|
||||||
|
"is_kritis_operator": False,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"must_match": [
|
||||||
|
"DSGVO",
|
||||||
|
"DSGVO_Art9", # Gesundheitsdaten = besondere Kategorie
|
||||||
|
"DSGVO_Art22", # Automatisierte Entscheidungen
|
||||||
|
"DSGVO_Art35", # DSFA fuer Gesundheitsdaten + KI
|
||||||
|
"AI_Act", # KI-Einsatz
|
||||||
|
"MDR", # Gesundheits-App kann Medizinprodukt sein
|
||||||
|
"BSI_TR_03161", # Technische Richtlinie fuer mobile Gesundheits-Apps
|
||||||
|
"DiGAV", # Digitale Gesundheitsanwendung
|
||||||
|
],
|
||||||
|
"must_not_match": [
|
||||||
|
"PSD2",
|
||||||
|
"TKG",
|
||||||
|
"Batterieverordnung",
|
||||||
|
"Maschinenverordnung",
|
||||||
|
"NIS2", # Zu klein
|
||||||
|
],
|
||||||
|
"rationale": (
|
||||||
|
"Gesundheits-App mit KI trifft die schaerfsten Anforderungen: "
|
||||||
|
"DSGVO Art. 9 (Gesundheitsdaten), Art. 22 (automatisierte Entscheidungen), "
|
||||||
|
"Art. 35 (DSFA Pflicht), AI Act (Hochrisiko-KI im Gesundheitsbereich), "
|
||||||
|
"MDR (evtl. Medizinprodukt), BSI TR-03161 (Sicherheit mobiler Gesundheits-Apps)."
|
||||||
|
),
|
||||||
|
},
|
||||||
|
|
||||||
|
# ===================================================================
|
||||||
|
# CASE 6: Automobilzulieferer (TISAX-relevant)
|
||||||
|
# ===================================================================
|
||||||
|
{
|
||||||
|
"id": "automotive_zulieferer",
|
||||||
|
"name": "Automobilzulieferer mit 500 MA, Prototypen-Fertigung",
|
||||||
|
"company": {
|
||||||
|
"industry": "Automobil",
|
||||||
|
"size": "large",
|
||||||
|
"scope_answers": {
|
||||||
|
"uses_ai": False,
|
||||||
|
"third_country_transfer": True, # Lieferkette international
|
||||||
|
"is_kritis_operator": False,
|
||||||
|
"handles_prototypes": True,
|
||||||
|
"supply_chain_automotive": True,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"must_match": [
|
||||||
|
"DSGVO",
|
||||||
|
"NIS2", # Large + Automotive (Lieferkette)
|
||||||
|
"ISO27001", # TISAX basiert auf ISO 27001
|
||||||
|
"Prototypenschutz", # OEM-Anforderung
|
||||||
|
"CE", # Produkte in EU
|
||||||
|
"Maschinenverordnung", # Produktion
|
||||||
|
],
|
||||||
|
"must_not_match": [
|
||||||
|
"PSD2",
|
||||||
|
"TKG",
|
||||||
|
"MDR",
|
||||||
|
"AI_Act",
|
||||||
|
],
|
||||||
|
"rationale": (
|
||||||
|
"Automobilzulieferer braucht TISAX-Readiness (basiert auf ISO 27001), "
|
||||||
|
"Prototypenschutz (OEM-Vorgabe), und NIS2 (Lieferkette, large). "
|
||||||
|
"TISAX selbst koennen wir nicht direkt zuweisen (VDA ISA proprietaer), "
|
||||||
|
"aber die zugrunde liegenden ISO/NIST Controls decken es ab."
|
||||||
|
),
|
||||||
|
},
|
||||||
|
|
||||||
|
# ===================================================================
|
||||||
|
# CASE 7: Rechtsanwaltskanzlei
|
||||||
|
# ===================================================================
|
||||||
|
{
|
||||||
|
"id": "rechtsanwaltskanzlei",
|
||||||
|
"name": "Wirtschaftskanzlei mit 30 Anwaelten",
|
||||||
|
"company": {
|
||||||
|
"industry": "Recht/Kanzlei",
|
||||||
|
"size": "small",
|
||||||
|
"scope_answers": {
|
||||||
|
"uses_ai": True, # KI fuer Dokumentenanalyse
|
||||||
|
"third_country_transfer": True, # US-Cloud-Dienste
|
||||||
|
"processes_health_data": False,
|
||||||
|
"automated_decisions": False,
|
||||||
|
"handles_legal_privilege": True,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"must_match": [
|
||||||
|
"DSGVO",
|
||||||
|
"DSGVO_Art46", # Drittlandtransfer (SCC)
|
||||||
|
"AI_Act", # KI-Einsatz
|
||||||
|
"BRAO", # Berufsordnung Rechtsanwaelte
|
||||||
|
"Mandantengeheimnis", # Berufsgeheimnis
|
||||||
|
],
|
||||||
|
"must_not_match": [
|
||||||
|
"NIS2", # Zu klein, kein KRITIS-Sektor
|
||||||
|
"PSD2",
|
||||||
|
"TKG",
|
||||||
|
"MDR",
|
||||||
|
"Batterieverordnung",
|
||||||
|
],
|
||||||
|
"rationale": (
|
||||||
|
"Kanzlei mit KI-Tools und US-Cloud braucht DSGVO + SCC (Drittland), "
|
||||||
|
"AI Act (KI-Einsatz), und berufsrechtliche Anforderungen (BRAO, "
|
||||||
|
"Mandantengeheimnis). NIS2 greift nicht (kein KRITIS-Sektor, zu klein)."
|
||||||
|
),
|
||||||
|
},
|
||||||
|
|
||||||
|
# ===================================================================
|
||||||
|
# CASE 8: E-Commerce Haendler mit eigenem Zahlungssystem
|
||||||
|
# ===================================================================
|
||||||
|
{
|
||||||
|
"id": "ecommerce_eigene_zahlung",
|
||||||
|
"name": "Online-Haendler mit eigenem Payment-Processing (keine Stripe-Delegation)",
|
||||||
|
"company": {
|
||||||
|
"industry": "E-Commerce/Handel",
|
||||||
|
"size": "medium",
|
||||||
|
"scope_answers": {
|
||||||
|
"payment_services": True, # EIGENE Zahlungsabwicklung
|
||||||
|
"uses_ai": True, # KI-Empfehlungen
|
||||||
|
"third_country_transfer": True,
|
||||||
|
"processes_minors_data": True, # Spielzeug-Shop
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"must_match": [
|
||||||
|
"DSGVO",
|
||||||
|
"DSGVO_Art8", # Kinderdaten
|
||||||
|
"PSD2", # EIGENER Payment-Service
|
||||||
|
"AI_Act", # KI-Empfehlungssystem
|
||||||
|
],
|
||||||
|
"must_not_match": [
|
||||||
|
"TKG",
|
||||||
|
"MDR",
|
||||||
|
"Maschinenverordnung",
|
||||||
|
],
|
||||||
|
"rationale": (
|
||||||
|
"Unterschied zu Case 2: Dieser Haendler betreibt EIGENES Payment-Processing, "
|
||||||
|
"ist also PSD2-reguliert. Dazu: Kinderdaten (Spielzeug-Shop) erfordern "
|
||||||
|
"DSGVO Art. 8 (Einwilligung Erziehungsberechtigter). KI-Empfehlungen "
|
||||||
|
"fallen unter AI Act."
|
||||||
|
),
|
||||||
|
},
|
||||||
|
|
||||||
|
# ===================================================================
|
||||||
|
# CASE 9: Bildungseinrichtung (Schule)
|
||||||
|
# ===================================================================
|
||||||
|
{
|
||||||
|
"id": "schule",
|
||||||
|
"name": "Oeffentliche Schule mit 80 Lehrkraeften",
|
||||||
|
"company": {
|
||||||
|
"industry": "Bildung",
|
||||||
|
"size": "medium",
|
||||||
|
"scope_answers": {
|
||||||
|
"processes_minors_data": True,
|
||||||
|
"uses_ai": True, # KI-Lernplattform
|
||||||
|
"video_surveillance": True, # Schulgelaende
|
||||||
|
"employee_monitoring": False,
|
||||||
|
"is_public_sector": True,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"must_match": [
|
||||||
|
"DSGVO",
|
||||||
|
"DSGVO_Art8", # Kinderdaten
|
||||||
|
"DSGVO_Art35", # DSFA (Kinderdaten + KI + Video)
|
||||||
|
"AI_Act", # KI-Lernplattform
|
||||||
|
"Schulrecht", # Landesschulgesetz
|
||||||
|
"BDSG", # Oeffentliche Stelle
|
||||||
|
],
|
||||||
|
"must_not_match": [
|
||||||
|
"PSD2",
|
||||||
|
"NIS2", # Bildung kein KRITIS-Sektor
|
||||||
|
"TKG",
|
||||||
|
"AML",
|
||||||
|
],
|
||||||
|
"rationale": (
|
||||||
|
"Schule verarbeitet Kinderdaten (DSGVO Art. 8), nutzt KI (AI Act), "
|
||||||
|
"hat Videoueberwachung (DSFA Pflicht). Als oeffentliche Stelle gilt BDSG. "
|
||||||
|
"NIS2 erfasst Bildung nicht als KRITIS-Sektor."
|
||||||
|
),
|
||||||
|
},
|
||||||
|
|
||||||
|
# ===================================================================
|
||||||
|
# CASE 10: Telko-Unternehmen
|
||||||
|
# ===================================================================
|
||||||
|
{
|
||||||
|
"id": "telko_provider",
|
||||||
|
"name": "Regionaler Internetanbieter mit 150 MA",
|
||||||
|
"company": {
|
||||||
|
"industry": "Telekommunikation",
|
||||||
|
"size": "medium",
|
||||||
|
"scope_answers": {
|
||||||
|
"is_kritis_operator": True,
|
||||||
|
"uses_ai": False,
|
||||||
|
"third_country_transfer": False,
|
||||||
|
"processes_health_data": False,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"must_match": [
|
||||||
|
"DSGVO",
|
||||||
|
"TKG", # Telko-spezifisch
|
||||||
|
"TTDSG", # Telekommunikation-Telemedien-Datenschutz
|
||||||
|
"NIS2", # KRITIS + medium
|
||||||
|
"KRITIS",
|
||||||
|
"BSI_Grundschutz",
|
||||||
|
],
|
||||||
|
"must_not_match": [
|
||||||
|
"PSD2",
|
||||||
|
"AI_Act",
|
||||||
|
"MDR",
|
||||||
|
"Batterieverordnung",
|
||||||
|
"Maschinenverordnung",
|
||||||
|
],
|
||||||
|
"rationale": (
|
||||||
|
"Telko-Anbieter ist KRITIS-Betreiber, TKG und TTDSG sind direkt anwendbar. "
|
||||||
|
"NIS2 greift (KRITIS + medium). BSI-Grundschutz de-facto Pflicht."
|
||||||
|
),
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Test Functions
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestApplicabilityUseCases:
|
||||||
|
"""Verify that the Applicability Engine assigns controls correctly."""
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("case", USE_CASES, ids=[c["id"] for c in USE_CASES])
|
||||||
|
def test_use_case_documented(self, case):
|
||||||
|
"""Each use case has required fields."""
|
||||||
|
assert case["id"]
|
||||||
|
assert case["name"]
|
||||||
|
assert case["company"]["industry"]
|
||||||
|
assert case["company"]["size"]
|
||||||
|
assert case["must_match"]
|
||||||
|
assert case["must_not_match"]
|
||||||
|
assert case["rationale"]
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("case", USE_CASES, ids=[c["id"] for c in USE_CASES])
|
||||||
|
def test_must_match_not_overlap_must_not(self, case):
|
||||||
|
"""must_match and must_not_match should not overlap."""
|
||||||
|
overlap = set(case["must_match"]) & set(case["must_not_match"])
|
||||||
|
assert not overlap, f"Overlap in {case['id']}: {overlap}"
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("case", USE_CASES, ids=[c["id"] for c in USE_CASES])
|
||||||
|
def test_scope_answers_are_booleans(self, case):
|
||||||
|
"""Scope answers should be boolean values."""
|
||||||
|
for key, val in case["company"]["scope_answers"].items():
|
||||||
|
assert isinstance(val, bool), f"{case['id']}: scope {key} is {type(val)}, expected bool"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Integration test placeholder — runs against real DB + Applicability Engine
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.mark.skip(reason="Requires running DB + Applicability Engine")
|
||||||
|
class TestApplicabilityIntegration:
|
||||||
|
"""Run use cases against the real Applicability Engine.
|
||||||
|
|
||||||
|
Enable by removing @skip and setting DATABASE_URL.
|
||||||
|
These tests query the actual canonical_controls table
|
||||||
|
and verify that the correct controls are returned.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("case", USE_CASES, ids=[c["id"] for c in USE_CASES])
|
||||||
|
def test_applicability_engine(self, case):
|
||||||
|
"""Verify control assignment for each use case."""
|
||||||
|
# TODO: Import ApplicabilityEngine, query DB, check results
|
||||||
|
# from services.applicability_engine import get_applicable_controls
|
||||||
|
# from db.session import SessionLocal
|
||||||
|
#
|
||||||
|
# db = SessionLocal()
|
||||||
|
# result = get_applicable_controls(
|
||||||
|
# db=db,
|
||||||
|
# industry=case["company"]["industry"],
|
||||||
|
# company_size=case["company"]["size"],
|
||||||
|
# scope_signals=case["company"]["scope_answers"],
|
||||||
|
# )
|
||||||
|
# control_sources = {c.source_citation.get("source", "") for c in result["controls"]}
|
||||||
|
#
|
||||||
|
# for required in case["must_match"]:
|
||||||
|
# assert any(required.lower() in s.lower() for s in control_sources), \
|
||||||
|
# f"{case['id']}: Expected {required} in results"
|
||||||
|
#
|
||||||
|
# for forbidden in case["must_not_match"]:
|
||||||
|
# assert not any(forbidden.lower() in s.lower() for s in control_sources), \
|
||||||
|
# f"{case['id']}: {forbidden} should NOT be in results"
|
||||||
|
pass
|
||||||
Reference in New Issue
Block a user