diff --git a/control-pipeline/services/control_generator.py b/control-pipeline/services/control_generator.py index f333f3b..4ef3da2 100644 --- a/control-pipeline/services/control_generator.py +++ b/control-pipeline/services/control_generator.py @@ -51,7 +51,9 @@ OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434") OLLAMA_MODEL = os.getenv("CONTROL_GEN_OLLAMA_MODEL", "qwen3.5:35b-a3b") LLM_TIMEOUT = float(os.getenv("CONTROL_GEN_LLM_TIMEOUT", "180")) -HARMONIZATION_THRESHOLD = 0.85 # Cosine similarity above this = duplicate +HARMONIZATION_THRESHOLD = 0.85 # Cosine similarity above this = candidate for dedup +HARMONIZATION_AUTO_DUP = 0.92 # Above this = auto-duplicate (no LLM check needed) +LLM_DEDUP_ENABLED = os.getenv("LLM_DEDUP_ENABLED", "true").lower() == "true" # Pipeline version — increment when generation rules change materially. # v1: Original (local LLM prefilter, old prompt) @@ -1589,10 +1591,13 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Elementen. Fuer Aspekte ohne # ── Stage 4: Harmonization ───────────────────────────────────────── async def _check_harmonization(self, new_control: GeneratedControl) -> Optional[list]: - """Check if a new control duplicates existing ones via Qdrant vector search. + """Check if a new control duplicates existing ones. - Uses the atomic_controls_dedup collection for fast nearest-neighbor lookup - instead of pre-loading all embeddings into memory. + Two-tier approach: + 1. Fast: Qdrant embedding similarity (pre-filter) + 2. Precise: Local LLM verification for borderline matches (0.85-0.92) + + Returns list of similar controls if duplicate, None otherwise. """ new_text = f"{new_control.title} {new_control.objective}" new_emb = await _get_embedding(new_text) @@ -1610,22 +1615,90 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Elementen. Fuer Aspekte ohne "with_payload": {"include": ["control_id", "title"]}, }, ) - if resp.status_code == 200: - results = resp.json().get("result", []) - if results: - return [ - { - "control_id": r["payload"].get("control_id", ""), - "title": r["payload"].get("title", ""), - "similarity": round(r["score"], 3), - } - for r in results - ] + if resp.status_code != 200: + return None + + results = resp.json().get("result", []) + if not results: + return None + + best = results[0] + best_score = best.get("score", 0.0) + best_id = best["payload"].get("control_id", "") + best_title = best["payload"].get("title", "") + + # Tier 1: High similarity → auto-duplicate + if best_score >= HARMONIZATION_AUTO_DUP: + return [{"control_id": best_id, "title": best_title, + "similarity": round(best_score, 3), "method": "embedding_auto"}] + + # Tier 2: Borderline → LLM verification + if LLM_DEDUP_ENABLED and best_score >= HARMONIZATION_THRESHOLD: + is_dup = await self._llm_verify_duplicate( + new_control.title, new_control.objective or "", + best_title, "", + ) + if is_dup: + return [{"control_id": best_id, "title": best_title, + "similarity": round(best_score, 3), "method": "llm_verified"}] + # LLM says different → not a duplicate + return None + + # Below threshold but returned by Qdrant → not a duplicate + return None + except Exception as e: - logger.warning("Qdrant dedup search failed: %s — skipping harmonization", e) + logger.warning("Harmonization check failed: %s — skipping", e) return None + async def _llm_verify_duplicate( + self, title_a: str, obj_a: str, title_b: str, obj_b: str, + ) -> bool: + """Ask local LLM whether two controls are duplicates. + + Returns True if the LLM classifies them as DUPLIKAT. + Uses qwen3.5 with think=false for fast (~3s) responses. + """ + prompt = ( + f"Control A:\n{title_a}\n{obj_a[:300]}\n\n" + f"Control B:\n{title_b}\n{obj_b[:300]}\n\n" + f"Sind diese Controls Duplikate?" + ) + try: + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.post( + f"{OLLAMA_URL}/api/chat", + json={ + "model": OLLAMA_MODEL, + "stream": False, + "think": False, + "options": {"num_predict": 200}, + "messages": [ + {"role": "system", "content": ( + "Du bist ein Compliance-Experte. Vergleiche zwei Controls: " + "DUPLIKAT (gleiche Anforderung, nur anders formuliert) oder " + "VERSCHIEDEN (unterschiedlicher Scope/Inhalt). " + "Antworte NUR mit JSON: {\"verdict\": \"DUPLIKAT\" oder \"VERSCHIEDEN\", " + "\"reason\": \"kurze Begruendung\"}" + )}, + {"role": "user", "content": prompt}, + ], + }, + ) + if resp.status_code != 200: + return False + + content = resp.json().get("message", {}).get("content", "") + parsed = _parse_llm_json(content) + if parsed and "DUPLIKAT" in str(parsed.get("verdict", "")).upper(): + return True + + except Exception as e: + logger.warning("LLM dedup verification failed: %s", e) + + return False + async def _preload_embeddings(self, existing: list[dict]): """Pre-load embeddings for all existing controls in batches.""" texts = [f"{ex.get('title', '')} {ex.get('objective', '')}" for ex in existing] diff --git a/control-pipeline/tests/test_applicability_use_cases.py b/control-pipeline/tests/test_applicability_use_cases.py new file mode 100644 index 0000000..b1e72f5 --- /dev/null +++ b/control-pipeline/tests/test_applicability_use_cases.py @@ -0,0 +1,469 @@ +""" +Applicability Use Case Tests — Real-world scenarios for control assignment. + +These test cases verify that our Applicability Engine correctly assigns +and does NOT assign controls based on company profile + scope answers. + +Each test case represents a real business scenario discussed during +product development. They serve as: +1. Regression tests for the Applicability Engine +2. Demo cases for the SDK +3. Documentation of regulatory nuances + +Run: pytest tests/test_applicability_use_cases.py -v +""" + +import pytest + + +# --------------------------------------------------------------------------- +# Test Case Data: Company Profiles + Expected Results +# --------------------------------------------------------------------------- + +USE_CASES = [ + # =================================================================== + # CASE 1: Bank mit TAN-Generator (Batterie im Produkt) + # =================================================================== + { + "id": "bank_tan_generator", + "name": "Bank gibt TAN-Generator mit Batterie an Kunden raus", + "company": { + "industry": "Finanzdienstleistungen", + "size": "large", + "scope_answers": { + "payment_services": True, # Bank ist Zahlungsdienstleister + "processes_health_data": False, + "uses_ai": False, + "third_country_transfer": True, + "manufactures_batteries": False, # Bank STELLT NICHT HER + }, + }, + "must_match": [ + "PSD2", # Bank IST Zahlungsdienstleister + "DSGVO", # Immer + "AML", # Bank hat AML-Pflichten + ], + "must_not_match": [ + "Batterieverordnung", # Bank stellt TAN-Generator nicht her + "Maschinenverordnung", # Kein Maschinenbau + "MDR", # Keine Medizinprodukte + ], + "rationale": ( + "Die Bank beschafft den TAN-Generator von einem Hersteller. " + "Der Hersteller unterliegt der Batterieverordnung, nicht die Bank. " + "Die Bank ist aber selbst PSD2-reguliert als Zahlungsdienstleister." + ), + }, + + # =================================================================== + # CASE 2: Industrieunternehmen eroeffnet Webshop mit Stripe + # =================================================================== + { + "id": "industrie_webshop_stripe", + "name": "Maschinenbau-Firma eroeffnet Webshop mit Stripe-Zahlung", + "company": { + "industry": "Maschinenbau", + "size": "medium", + "scope_answers": { + "payment_services": False, # Stripe ist der Zahlungsdienstleister! + "uses_ai": False, + "third_country_transfer": True, # Stripe ist US-Unternehmen + "processes_health_data": False, + "has_webshop": True, + }, + }, + "must_match": [ + "DSGVO", # Immer + "DSGVO_AV_Vertrag", # Stripe als Auftragsverarbeiter + "DSGVO_Datenschutzinfo", # Stripe in Datenschutzerklaerung nennen + "Maschinenverordnung", # Kerngeschaeft + "CE", # Maschinenbau braucht CE + ], + "must_not_match": [ + "PSD2", # Stripe ist der Zahlungsdienstleister, NICHT die Firma + "AML", # Keine eigene Zahlungsabwicklung + "BaFin", # Kein Finanzinstitut + ], + "rationale": ( + "Stripe ist Zahlungsdienstleister in eigenem Auftrag. Der Webshop-Betreiber " + "wird nicht zum regulierten Zahlungsinstitut. Er muss nur Stripe als " + "Auftragsverarbeiter in der Datenschutzinformation korrekt benennen (DSGVO Art. 13/14). " + "Rechtsanwaltlich begleitete Stripe-Anbindung hat das bestaetigt." + ), + }, + + # =================================================================== + # CASE 3: Kleines SaaS-Startup (5 Personen) + # =================================================================== + { + "id": "saas_startup_klein", + "name": "5-Personen SaaS-Startup (Cloud-Software, keine KI)", + "company": { + "industry": "Technologie/IT", + "size": "micro", + "scope_answers": { + "uses_ai": False, + "third_country_transfer": False, # EU-only Hosting + "processes_health_data": False, + "automated_decisions": False, + "payment_services": False, + "is_kritis_operator": False, + }, + }, + "must_match": [ + "DSGVO", # Immer + "OWASP", # Software-Sicherheit + ], + "must_not_match": [ + "NIS2", # Zu klein (NIS2 ab medium/50 MA) + "AI_Act", # Keine KI + "Batterieverordnung", # Kein Hardware-Produkt + "TKG", # Kein Telko-Anbieter + "MDR", # Keine Medizinprodukte + "PSD2", # Kein Zahlungsdienstleister + "KRITIS", # Zu klein, kein kritischer Sektor + ], + "rationale": ( + "Ein Kleinstunternehmen ohne KI, ohne KRITIS-Zugehoerigkeit, ohne " + "Drittlandtransfer braucht nur DSGVO-Basics und Software-Sicherheit. " + "NIS2 greift erst ab 50 Mitarbeitern / 10 Mio Umsatz." + ), + }, + + # =================================================================== + # CASE 4: Mittelstaendischer Energieversorger + # =================================================================== + { + "id": "energieversorger_mittelstand", + "name": "Stadtwerk mit 200 Mitarbeitern (Strom + Gas)", + "company": { + "industry": "Energie", + "size": "medium", + "scope_answers": { + "is_kritis_operator": True, + "uses_ai": False, + "third_country_transfer": False, + "processes_health_data": False, + "employee_monitoring": True, # Leitwarte mit Kameras + }, + }, + "must_match": [ + "DSGVO", + "NIS2", # Energie = KRITIS-Sektor + medium + "KRITIS", # Energieversorger + "BDSG", # Mitarbeiterueberwachung + "BSI_Grundschutz", # KRITIS-Betreiber + ], + "must_not_match": [ + "PSD2", + "AI_Act", + "MDR", + "TKG", + "Batterieverordnung", + ], + "rationale": ( + "Stadtwerk ist KRITIS-Betreiber im Energiesektor. NIS2 greift ab medium " + "(50 MA). BSI-Grundschutz ist de-facto Pflicht fuer KRITIS. " + "Mitarbeiterueberwachung (Leitwarte) erfordert BDSG-Compliance." + ), + }, + + # =================================================================== + # CASE 5: Gesundheits-App Startup mit KI + # =================================================================== + { + "id": "health_app_ki", + "name": "Startup entwickelt KI-basierte Gesundheits-App (DiGA)", + "company": { + "industry": "Gesundheitswesen", + "size": "small", + "scope_answers": { + "uses_ai": True, + "processes_health_data": True, + "automated_decisions": True, + "third_country_transfer": False, + "is_kritis_operator": False, + }, + }, + "must_match": [ + "DSGVO", + "DSGVO_Art9", # Gesundheitsdaten = besondere Kategorie + "DSGVO_Art22", # Automatisierte Entscheidungen + "DSGVO_Art35", # DSFA fuer Gesundheitsdaten + KI + "AI_Act", # KI-Einsatz + "MDR", # Gesundheits-App kann Medizinprodukt sein + "BSI_TR_03161", # Technische Richtlinie fuer mobile Gesundheits-Apps + "DiGAV", # Digitale Gesundheitsanwendung + ], + "must_not_match": [ + "PSD2", + "TKG", + "Batterieverordnung", + "Maschinenverordnung", + "NIS2", # Zu klein + ], + "rationale": ( + "Gesundheits-App mit KI trifft die schaerfsten Anforderungen: " + "DSGVO Art. 9 (Gesundheitsdaten), Art. 22 (automatisierte Entscheidungen), " + "Art. 35 (DSFA Pflicht), AI Act (Hochrisiko-KI im Gesundheitsbereich), " + "MDR (evtl. Medizinprodukt), BSI TR-03161 (Sicherheit mobiler Gesundheits-Apps)." + ), + }, + + # =================================================================== + # CASE 6: Automobilzulieferer (TISAX-relevant) + # =================================================================== + { + "id": "automotive_zulieferer", + "name": "Automobilzulieferer mit 500 MA, Prototypen-Fertigung", + "company": { + "industry": "Automobil", + "size": "large", + "scope_answers": { + "uses_ai": False, + "third_country_transfer": True, # Lieferkette international + "is_kritis_operator": False, + "handles_prototypes": True, + "supply_chain_automotive": True, + }, + }, + "must_match": [ + "DSGVO", + "NIS2", # Large + Automotive (Lieferkette) + "ISO27001", # TISAX basiert auf ISO 27001 + "Prototypenschutz", # OEM-Anforderung + "CE", # Produkte in EU + "Maschinenverordnung", # Produktion + ], + "must_not_match": [ + "PSD2", + "TKG", + "MDR", + "AI_Act", + ], + "rationale": ( + "Automobilzulieferer braucht TISAX-Readiness (basiert auf ISO 27001), " + "Prototypenschutz (OEM-Vorgabe), und NIS2 (Lieferkette, large). " + "TISAX selbst koennen wir nicht direkt zuweisen (VDA ISA proprietaer), " + "aber die zugrunde liegenden ISO/NIST Controls decken es ab." + ), + }, + + # =================================================================== + # CASE 7: Rechtsanwaltskanzlei + # =================================================================== + { + "id": "rechtsanwaltskanzlei", + "name": "Wirtschaftskanzlei mit 30 Anwaelten", + "company": { + "industry": "Recht/Kanzlei", + "size": "small", + "scope_answers": { + "uses_ai": True, # KI fuer Dokumentenanalyse + "third_country_transfer": True, # US-Cloud-Dienste + "processes_health_data": False, + "automated_decisions": False, + "handles_legal_privilege": True, + }, + }, + "must_match": [ + "DSGVO", + "DSGVO_Art46", # Drittlandtransfer (SCC) + "AI_Act", # KI-Einsatz + "BRAO", # Berufsordnung Rechtsanwaelte + "Mandantengeheimnis", # Berufsgeheimnis + ], + "must_not_match": [ + "NIS2", # Zu klein, kein KRITIS-Sektor + "PSD2", + "TKG", + "MDR", + "Batterieverordnung", + ], + "rationale": ( + "Kanzlei mit KI-Tools und US-Cloud braucht DSGVO + SCC (Drittland), " + "AI Act (KI-Einsatz), und berufsrechtliche Anforderungen (BRAO, " + "Mandantengeheimnis). NIS2 greift nicht (kein KRITIS-Sektor, zu klein)." + ), + }, + + # =================================================================== + # CASE 8: E-Commerce Haendler mit eigenem Zahlungssystem + # =================================================================== + { + "id": "ecommerce_eigene_zahlung", + "name": "Online-Haendler mit eigenem Payment-Processing (keine Stripe-Delegation)", + "company": { + "industry": "E-Commerce/Handel", + "size": "medium", + "scope_answers": { + "payment_services": True, # EIGENE Zahlungsabwicklung + "uses_ai": True, # KI-Empfehlungen + "third_country_transfer": True, + "processes_minors_data": True, # Spielzeug-Shop + }, + }, + "must_match": [ + "DSGVO", + "DSGVO_Art8", # Kinderdaten + "PSD2", # EIGENER Payment-Service + "AI_Act", # KI-Empfehlungssystem + ], + "must_not_match": [ + "TKG", + "MDR", + "Maschinenverordnung", + ], + "rationale": ( + "Unterschied zu Case 2: Dieser Haendler betreibt EIGENES Payment-Processing, " + "ist also PSD2-reguliert. Dazu: Kinderdaten (Spielzeug-Shop) erfordern " + "DSGVO Art. 8 (Einwilligung Erziehungsberechtigter). KI-Empfehlungen " + "fallen unter AI Act." + ), + }, + + # =================================================================== + # CASE 9: Bildungseinrichtung (Schule) + # =================================================================== + { + "id": "schule", + "name": "Oeffentliche Schule mit 80 Lehrkraeften", + "company": { + "industry": "Bildung", + "size": "medium", + "scope_answers": { + "processes_minors_data": True, + "uses_ai": True, # KI-Lernplattform + "video_surveillance": True, # Schulgelaende + "employee_monitoring": False, + "is_public_sector": True, + }, + }, + "must_match": [ + "DSGVO", + "DSGVO_Art8", # Kinderdaten + "DSGVO_Art35", # DSFA (Kinderdaten + KI + Video) + "AI_Act", # KI-Lernplattform + "Schulrecht", # Landesschulgesetz + "BDSG", # Oeffentliche Stelle + ], + "must_not_match": [ + "PSD2", + "NIS2", # Bildung kein KRITIS-Sektor + "TKG", + "AML", + ], + "rationale": ( + "Schule verarbeitet Kinderdaten (DSGVO Art. 8), nutzt KI (AI Act), " + "hat Videoueberwachung (DSFA Pflicht). Als oeffentliche Stelle gilt BDSG. " + "NIS2 erfasst Bildung nicht als KRITIS-Sektor." + ), + }, + + # =================================================================== + # CASE 10: Telko-Unternehmen + # =================================================================== + { + "id": "telko_provider", + "name": "Regionaler Internetanbieter mit 150 MA", + "company": { + "industry": "Telekommunikation", + "size": "medium", + "scope_answers": { + "is_kritis_operator": True, + "uses_ai": False, + "third_country_transfer": False, + "processes_health_data": False, + }, + }, + "must_match": [ + "DSGVO", + "TKG", # Telko-spezifisch + "TTDSG", # Telekommunikation-Telemedien-Datenschutz + "NIS2", # KRITIS + medium + "KRITIS", + "BSI_Grundschutz", + ], + "must_not_match": [ + "PSD2", + "AI_Act", + "MDR", + "Batterieverordnung", + "Maschinenverordnung", + ], + "rationale": ( + "Telko-Anbieter ist KRITIS-Betreiber, TKG und TTDSG sind direkt anwendbar. " + "NIS2 greift (KRITIS + medium). BSI-Grundschutz de-facto Pflicht." + ), + }, +] + + +# --------------------------------------------------------------------------- +# Test Functions +# --------------------------------------------------------------------------- + +class TestApplicabilityUseCases: + """Verify that the Applicability Engine assigns controls correctly.""" + + @pytest.mark.parametrize("case", USE_CASES, ids=[c["id"] for c in USE_CASES]) + def test_use_case_documented(self, case): + """Each use case has required fields.""" + assert case["id"] + assert case["name"] + assert case["company"]["industry"] + assert case["company"]["size"] + assert case["must_match"] + assert case["must_not_match"] + assert case["rationale"] + + @pytest.mark.parametrize("case", USE_CASES, ids=[c["id"] for c in USE_CASES]) + def test_must_match_not_overlap_must_not(self, case): + """must_match and must_not_match should not overlap.""" + overlap = set(case["must_match"]) & set(case["must_not_match"]) + assert not overlap, f"Overlap in {case['id']}: {overlap}" + + @pytest.mark.parametrize("case", USE_CASES, ids=[c["id"] for c in USE_CASES]) + def test_scope_answers_are_booleans(self, case): + """Scope answers should be boolean values.""" + for key, val in case["company"]["scope_answers"].items(): + assert isinstance(val, bool), f"{case['id']}: scope {key} is {type(val)}, expected bool" + + +# --------------------------------------------------------------------------- +# Integration test placeholder — runs against real DB + Applicability Engine +# --------------------------------------------------------------------------- + +@pytest.mark.skip(reason="Requires running DB + Applicability Engine") +class TestApplicabilityIntegration: + """Run use cases against the real Applicability Engine. + + Enable by removing @skip and setting DATABASE_URL. + These tests query the actual canonical_controls table + and verify that the correct controls are returned. + """ + + @pytest.mark.parametrize("case", USE_CASES, ids=[c["id"] for c in USE_CASES]) + def test_applicability_engine(self, case): + """Verify control assignment for each use case.""" + # TODO: Import ApplicabilityEngine, query DB, check results + # from services.applicability_engine import get_applicable_controls + # from db.session import SessionLocal + # + # db = SessionLocal() + # result = get_applicable_controls( + # db=db, + # industry=case["company"]["industry"], + # company_size=case["company"]["size"], + # scope_signals=case["company"]["scope_answers"], + # ) + # control_sources = {c.source_citation.get("source", "") for c in result["controls"]} + # + # for required in case["must_match"]: + # assert any(required.lower() in s.lower() for s in control_sources), \ + # f"{case['id']}: Expected {required} in results" + # + # for forbidden in case["must_not_match"]: + # assert not any(forbidden.lower() in s.lower() for s in control_sources), \ + # f"{case['id']}: {forbidden} should NOT be in results" + pass