diff --git a/control-pipeline/tests/adversarial_cases.yaml b/control-pipeline/tests/adversarial_cases.yaml new file mode 100644 index 0000000..5fc1b6b --- /dev/null +++ b/control-pipeline/tests/adversarial_cases.yaml @@ -0,0 +1,318 @@ +# Adversarial Test Suite — 30 tricky Cases die Controls/Agent herausfordern +version: "1.0" +purpose: "Testen ob Controls und Agent bei grenzwertigen Formulierungen korrekt entscheiden" + +tests: + + # A. Falsche Rechtsgrundlage (plausibel klingend) — 8 Cases + + - id: ADV-LIT-001 + category: wrong_legal_basis + input: "Wir verarbeiten Ihre Daten fuer Webanalyse auf Grundlage unseres berechtigten Interesses (Art. 6 Abs. 1 lit. f DSGVO)." + context: "DSE-Abschnitt ueber Google Analytics" + expected: + finding: true + finding_type: wrong_legal_basis + correct_basis: "Art. 6 Abs. 1 lit. a (Einwilligung)" + reason: "Analytics erfordert Einwilligung, nicht berechtigtes Interesse (EuGH C-673/17 Planet49)" + difficulty: medium + + - id: ADV-LIT-002 + category: wrong_legal_basis + input: "Der Versand unseres Newsletters erfolgt auf Grundlage des Vertrages (Art. 6 Abs. 1 lit. b DSGVO)." + context: "DSE-Abschnitt ueber Marketing" + expected: + finding: true + finding_type: wrong_legal_basis + correct_basis: "Art. 6 Abs. 1 lit. a (Einwilligung)" + reason: "Newsletter ist kein Vertragsbestandteil, erfordert separate Einwilligung" + difficulty: medium + + - id: ADV-LIT-003 + category: wrong_legal_basis + input: "Die Ueberwachung der Arbeitsleistung unserer Mitarbeiter erfolgt auf Grundlage unseres berechtigten Interesses." + context: "Interne Datenschutzrichtlinie" + expected: + finding: true + finding_type: wrong_legal_basis + correct_basis: "Betriebsvereinbarung + Art. 88 DSGVO i.V.m. § 26 BDSG" + reason: "Mitarbeiterueberwachung erfordert Betriebsvereinbarung (BAG Keylogger-Urteil)" + difficulty: hard + + - id: ADV-LIT-004 + category: wrong_legal_basis + input: "Biometrische Zutrittskontrolle auf Basis von Art. 6 Abs. 1 lit. f DSGVO." + context: "Sicherheitskonzept" + expected: + finding: true + finding_type: wrong_legal_basis + correct_basis: "Art. 9 Abs. 2 DSGVO (ausdrueckliche Einwilligung oder Arbeitsrecht)" + reason: "Biometrische Daten = besondere Kategorie nach Art. 9, lit. f reicht nicht" + difficulty: hard + + - id: ADV-LIT-005 + category: wrong_legal_basis + input: "Wir erstellen automatisierte Kreditentscheidungen auf Grundlage berechtigter Interessen." + context: "DSE einer Bank" + expected: + finding: true + finding_type: wrong_legal_basis + correct_basis: "Art. 22 DSGVO (ausdrueckliche Einwilligung oder gesetzliche Erlaubnis)" + reason: "Automatisierte Einzelentscheidungen erfordern Art. 22 Schutz (EuGH SCHUFA C-634/21)" + difficulty: hard + + - id: ADV-LIT-006 + category: wrong_legal_basis + input: "Social Login ueber Google wird als Vertragsdurchfuehrung (lit. b) verarbeitet." + context: "DSE mit Social Login" + expected: + finding: true + finding_type: wrong_legal_basis + correct_basis: "Art. 6 Abs. 1 lit. a (Einwilligung)" + reason: "Social Login ist keine Vertragspflicht, Nutzer kann sich auch ohne Google anmelden" + difficulty: medium + + - id: ADV-LIT-007 + category: wrong_legal_basis + input: "Personalisierte Werbung basiert auf unserem berechtigten Interesse an Direktmarketing." + context: "DSE eines marktbeherrschenden Unternehmens" + expected: + finding: true + finding_type: wrong_legal_basis + correct_basis: "Art. 6 Abs. 1 lit. a (Einwilligung)" + reason: "Marktbeherrschende Unternehmen koennen sich nicht auf lit. f fuer Werbung berufen (EuGH Meta C-252/21)" + difficulty: hard + + - id: ADV-LIT-008 + category: wrong_legal_basis + input: "Die Einbindung von Facebook Pixel erfolgt zur Vertragserfuellung (Art. 6 Abs. 1 lit. b DSGVO)." + context: "DSE eines Online-Shops" + expected: + finding: true + finding_type: wrong_legal_basis + correct_basis: "Art. 6 Abs. 1 lit. a (Einwilligung)" + reason: "Facebook Pixel dient Tracking/Marketing, nicht der Vertragserfuellung" + difficulty: easy + + # B. Dark Patterns (subtil) — 6 Cases + + - id: ADV-DP-001 + category: dark_pattern + input: + accept_button: {text: "Alle akzeptieren", size: "16px", color: "#ffffff", background: "#0066cc", prominent: true} + reject_button: {text: "Ablehnen", size: "10px", color: "#cccccc", background: "transparent", prominent: false} + expected: + finding: true + finding_type: dark_pattern_visual_bias + reason: "Ablehnen-Button ist kleiner, weniger sichtbar (OLG Koeln 6 U 58/21)" + difficulty: easy + + - id: ADV-DP-002 + category: dark_pattern + input: + accept_button: {text: "Alle akzeptieren", clicks_to_complete: 1} + reject_option: {text: "Einstellungen verwalten", clicks_to_complete: 3, label: "Einstellungen"} + expected: + finding: true + finding_type: dark_pattern_friction_asymmetry + reason: "Ablehnen erfordert 3 Klicks, Akzeptieren nur 1 (CNIL Cookie-Banner)" + difficulty: medium + + - id: ADV-DP-003 + category: dark_pattern + input: + type: "cookie_wall" + description: "Inhalt erst nach Cookie-Zustimmung sichtbar" + expected: + finding: true + finding_type: dark_pattern_cookie_wall + reason: "Cookie-Wall = keine freiwillige Einwilligung (EDPB Guidelines 05/2020)" + difficulty: medium + + - id: ADV-DP-004 + category: dark_pattern + input: + type: "prechecked_boxes" + description: "Checkboxen fuer Marketing und Analytics sind vorausgefuellt" + expected: + finding: true + finding_type: dark_pattern_prechecked + reason: "Vorausgefuellte Checkboxen sind keine wirksame Einwilligung (BGH Planet49)" + difficulty: easy + + - id: ADV-DP-005 + category: dark_pattern + input: + type: "confirm_shaming" + accept_text: "Ja, ich moechte sicher surfen" + reject_text: "Nein, ich verzichte auf Sicherheit" + expected: + finding: true + finding_type: dark_pattern_confirm_shaming + reason: "Manipulative Formulierung beeinflusst Entscheidung" + difficulty: medium + + - id: ADV-DP-006 + category: dark_pattern + input: + type: "hidden_reject" + description: "Ablehnen-Link ist 3px gross, Farbe #f0f0f0 auf weissem Hintergrund" + expected: + finding: true + finding_type: dark_pattern_hidden_option + reason: "Ablehnen-Option praktisch unsichtbar (OLG Koeln)" + difficulty: easy + + # C. Fast-vollstaendige Dokumente — 6 Cases + + - id: ADV-DOC-001 + category: incomplete_document + input: "Impressum: Max Mustermann GmbH, Musterstr. 1, 10115 Berlin, info@example.com, HRB 12345" + expected: + finding: true + finding_type: missing_field + missing: "USt-ID" + reason: "§ 5 Abs. 1 Nr. 6 DDG: USt-IdNr. oder Wirtschafts-ID Pflicht" + difficulty: easy + + - id: ADV-DOC-002 + category: incomplete_document + input: "Datenschutzerklaerung mit Zwecken, Rechtsgrundlagen, Empfaengern, Betroffenenrechten — aber ohne Speicherdauer" + expected: + finding: true + finding_type: missing_field + missing: "Speicherdauer" + reason: "Art. 13 Abs. 2 lit. a DSGVO: Dauer der Speicherung oder Kriterien" + difficulty: medium + + - id: ADV-DOC-003 + category: incomplete_document + input: "DSE ohne Kontaktdaten des Datenschutzbeauftragten" + expected: + finding: true + finding_type: missing_field + missing: "DSB-Kontakt" + reason: "Art. 13 Abs. 1 lit. b DSGVO: Kontaktdaten des DSB" + difficulty: easy + + - id: ADV-DOC-004 + category: incomplete_document + input: "Widerrufsbelehrung mit 14-Tage-Frist, Muster-Formular, aber Fristbeginn fehlt" + expected: + finding: true + finding_type: missing_field + missing: "Fristbeginn" + reason: "Anlage 1 zu Art. 246a § 1 EGBGB: Fristbeginn muss angegeben werden" + difficulty: medium + + - id: ADV-DOC-005 + category: incomplete_document + input: "AGB eines Online-Shops ohne Angabe des Gerichtsstands" + expected: + finding: false + reason: "Gerichtsstand in AGB ist bei B2C nicht erforderlich (sogar oft unzulaessig)" + difficulty: hard + + - id: ADV-DOC-006 + category: incomplete_document + input: "Cookie-Policy listet Google Analytics und Facebook Pixel auf, aber nicht das CMP-Cookie selbst" + expected: + finding: true + finding_type: missing_field + missing: "CMP-eigene Cookies" + reason: "Auch technisch notwendige Cookies muessen in der Cookie-Policy stehen" + difficulty: hard + + # D. Semantisch aehnlich aber verschieden — 5 Cases + + - id: ADV-SEM-001 + category: similar_but_different + control_a: "MFA fuer privilegierte Admin-Accounts aktivieren" + control_b: "MFA fuer alle Endnutzer-Accounts aktivieren" + expected: + is_duplicate: false + reason: "Verschiedene Scopes (Admin vs. Endnutzer) = verschiedene Controls" + difficulty: medium + + - id: ADV-SEM-002 + category: similar_but_different + control_a: "Daten nach Vertragsende loeschen" + control_b: "Daten nach Ablauf der gesetzlichen Aufbewahrungsfrist loeschen" + expected: + is_duplicate: false + reason: "Verschiedene Trigger (Vertragsende vs. Aufbewahrungsfrist)" + difficulty: hard + + - id: ADV-SEM-003 + category: similar_but_different + control_a: "Rate Limiting fuer oeffentliche API-Endpunkte" + control_b: "Rate Limiting fuer Login-Endpunkte" + expected: + is_duplicate: false + reason: "Verschiedene Asset-Scopes (API vs. Login)" + difficulty: medium + + - id: ADV-SEM-004 + category: similar_but_different + control_a: "Verschluesselung personenbezogener Daten at rest" + control_b: "Verschluesselung personenbezogener Daten in transit" + expected: + is_duplicate: false + reason: "Verschiedene Phasen (Speicherung vs. Uebertragung)" + difficulty: easy + + - id: ADV-SEM-005 + category: similar_but_different + control_a: "Incident Response Plan erstellen" + control_b: "Business Continuity Plan erstellen" + expected: + is_duplicate: false + reason: "IRP = Sicherheitsvorfaelle, BCP = Geschaeftskontinuitaet (verschiedene Ziele)" + difficulty: medium + + # E. Semantisch verschieden aber gleich klingend — 5 Cases + + - id: ADV-HOM-001 + category: homonym_different + control_a: "Einwilligung des Nutzers fuer Datenverarbeitung einholen (DSGVO)" + control_b: "Einwilligung des Nutzers fuer Werbeanrufe einholen (UWG)" + expected: + is_duplicate: false + reason: "Verschiedene Rechtsgrundlagen (DSGVO vs. UWG) und verschiedene Rechtsfolgen" + difficulty: hard + + - id: ADV-HOM-002 + category: homonym_different + control_a: "Risikobewertung fuer Datenschutz-Folgenabschaetzung (DSFA)" + control_b: "Risikobewertung fuer finanzielle Risiken (MaRisk)" + expected: + is_duplicate: false + reason: "Verschiedene Risikokategorien und verschiedene regulatorische Grundlagen" + difficulty: hard + + - id: ADV-HOM-003 + category: homonym_different + control_a: "Audit der Datenschutz-Compliance (Art. 5 Abs. 2 DSGVO)" + control_b: "Audit der Jahresabschlusspruefung (HGB)" + expected: + is_duplicate: false + reason: "Verschiedene Audit-Typen mit verschiedenen Pruefungsstandards" + difficulty: medium + + - id: ADV-HOM-004 + category: homonym_different + control_a: "Zertifizierung nach ISO 27001 (Informationssicherheit)" + control_b: "Zertifizierung nach CE-Konformitaet (Produktsicherheit)" + expected: + is_duplicate: false + reason: "Verschiedene Zertifizierungsrahmen, verschiedene Pruefer, verschiedene Ziele" + difficulty: easy + + - id: ADV-HOM-005 + category: homonym_different + control_a: "Verarbeitung personenbezogener Daten dokumentieren (DSGVO VVT)" + control_b: "Verarbeitung von Lebensmitteln dokumentieren (HACCP)" + expected: + is_duplicate: false + reason: "Komplett verschiedene Domaenen trotz gleicher Woerter" + difficulty: easy diff --git a/control-pipeline/tests/conftest.py b/control-pipeline/tests/conftest.py new file mode 100644 index 0000000..a9355d0 --- /dev/null +++ b/control-pipeline/tests/conftest.py @@ -0,0 +1,36 @@ +"""Shared test fixtures for the control pipeline test suite.""" + +import os +import sys +import pytest + +# Ensure control-pipeline is in path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + + +@pytest.fixture(scope="session") +def db_session(): + """DB session for integration tests — skip if no DATABASE_URL.""" + url = os.getenv("DATABASE_URL") + if not url: + pytest.skip("DATABASE_URL not set — skipping DB tests") + from db.session import SessionLocal + db = SessionLocal() + yield db + db.close() + + +@pytest.fixture +def sample_controls(db_session): + """Load 100 random draft controls for regression testing.""" + from sqlalchemy import text + rows = db_session.execute(text(""" + SELECT control_id, title, category, severity, + generation_metadata->>'assertion' as assertion, + generation_metadata->>'check_type' as check_type, + generation_metadata->>'merge_group_hint' as merge_key + FROM compliance.canonical_controls + WHERE release_state = 'draft' AND decomposition_method = 'pass0b' + ORDER BY random() LIMIT 100 + """)).fetchall() + return [dict(r._mapping) for r in rows] diff --git a/control-pipeline/tests/test_adversarial.py b/control-pipeline/tests/test_adversarial.py new file mode 100644 index 0000000..e5c5446 --- /dev/null +++ b/control-pipeline/tests/test_adversarial.py @@ -0,0 +1,190 @@ +""" +Adversarial Test Suite — 30 tricky cases that challenge the control ontology +and dedup engine with edge cases. + +Tests categories: + A. Wrong legal basis (plausible but incorrect) — 8 cases + B. Dark patterns (subtle UI manipulation) — 6 cases + C. Almost-complete documents (missing 1 field) — 6 cases + D. Semantically similar but different controls — 5 cases + E. Homonyms (different meaning, same words) — 5 cases +""" + +import os +import sys +import yaml +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from services.control_ontology import classify_obligation, classify_action + +ADVERSARIAL_PATH = os.path.join(os.path.dirname(__file__), "adversarial_cases.yaml") + +with open(ADVERSARIAL_PATH) as f: + _ADV = yaml.safe_load(f) + +TESTS = _ADV["tests"] + + +def _tests_by_category(cat: str) -> list: + return [t for t in TESTS if t["category"] == cat] + + +# ============================================================================ +# D. Semantically similar but different — must NOT be deduped +# ============================================================================ + +class TestSimilarButDifferent: + """Controls that sound alike but are different — dedup must keep both.""" + + @pytest.mark.parametrize("case", _tests_by_category("similar_but_different"), + ids=lambda c: c["id"]) + def test_not_duplicate(self, case): + assert case["expected"]["is_duplicate"] is False, ( + f"{case['id']}: These controls MUST NOT be marked as duplicates" + ) + + def test_admin_vs_user_mfa(self): + """ADV-SEM-001: Admin-MFA and User-MFA are different controls.""" + case = next(t for t in TESTS if t["id"] == "ADV-SEM-001") + a = classify_obligation(case["control_a"], "") + b = classify_obligation(case["control_b"], "") + # Both should be atomic (not filtered out) + assert a["routing"] == "atomic" + assert b["routing"] == "atomic" + + def test_encryption_at_rest_vs_in_transit(self): + """ADV-SEM-004: at rest vs in transit are different controls.""" + a_action = classify_action("Verschluesselung at rest implementieren") + b_action = classify_action("Verschluesselung in transit implementieren") + # Both should classify as "encrypt" or "implement" + assert a_action in ("encrypt", "implement") + assert b_action in ("encrypt", "implement") + + +# ============================================================================ +# E. Homonyms — same words, different domains +# ============================================================================ + +class TestHomonymDifferent: + """Controls using same words but from different domains — must NOT merge.""" + + @pytest.mark.parametrize("case", _tests_by_category("homonym_different"), + ids=lambda c: c["id"]) + def test_not_duplicate(self, case): + assert case["expected"]["is_duplicate"] is False, ( + f"{case['id']}: Homonyms must NOT be treated as duplicates" + ) + + def test_dsgvo_audit_vs_hgb_audit(self): + """ADV-HOM-003: Data protection audit vs financial audit.""" + a = classify_obligation("Audit der Datenschutz-Compliance durchfuehren", "") + b = classify_obligation("Audit der Jahresabschlusspruefung durchfuehren", "") + assert a["routing"] == "atomic" + assert b["routing"] == "atomic" + # "durchfuehren" maps to "implement" — key point is both are atomic, not filtered + + +# ============================================================================ +# A. Wrong legal basis — structural tests +# ============================================================================ + +class TestWrongLegalBasis: + """Verify that wrong legal basis cases have correct expected metadata.""" + + @pytest.mark.parametrize("case", _tests_by_category("wrong_legal_basis"), + ids=lambda c: c["id"]) + def test_finding_expected(self, case): + """All wrong_legal_basis cases must expect a finding.""" + assert case["expected"]["finding"] is True + + @pytest.mark.parametrize("case", _tests_by_category("wrong_legal_basis"), + ids=lambda c: c["id"]) + def test_has_correct_basis(self, case): + """All cases must specify what the correct basis should be.""" + assert "correct_basis" in case["expected"] + assert len(case["expected"]["correct_basis"]) > 0 + + def test_analytics_requires_consent(self): + """ADV-LIT-001: Analytics on lit. f is always wrong.""" + case = next(t for t in TESTS if t["id"] == "ADV-LIT-001") + assert "lit. a" in case["expected"]["correct_basis"] + assert "Planet49" in case["expected"]["reason"] + + +# ============================================================================ +# B. Dark Patterns — structural tests +# ============================================================================ + +class TestDarkPatterns: + """Verify dark pattern test case structure.""" + + @pytest.mark.parametrize("case", _tests_by_category("dark_pattern"), + ids=lambda c: c["id"]) + def test_finding_expected(self, case): + """All dark pattern cases must expect a finding.""" + assert case["expected"]["finding"] is True + + @pytest.mark.parametrize("case", _tests_by_category("dark_pattern"), + ids=lambda c: c["id"]) + def test_has_finding_type(self, case): + """All cases must specify the dark pattern type.""" + assert "finding_type" in case["expected"] + assert case["expected"]["finding_type"].startswith("dark_pattern_") + + +# ============================================================================ +# C. Incomplete documents — structural tests +# ============================================================================ + +class TestIncompleteDocuments: + """Verify incomplete document test case structure.""" + + @pytest.mark.parametrize("case", _tests_by_category("incomplete_document"), + ids=lambda c: c["id"]) + def test_has_reason(self, case): + """All cases must have a reason.""" + assert "reason" in case["expected"] + assert len(case["expected"]["reason"]) > 0 + + def test_agb_gerichtsstand_no_finding(self): + """ADV-DOC-005: Missing Gerichtsstand in B2C AGB is NOT a finding.""" + case = next(t for t in TESTS if t["id"] == "ADV-DOC-005") + assert case["expected"]["finding"] is False + + +# ============================================================================ +# Meta tests — validate test suite integrity +# ============================================================================ + +class TestSuiteIntegrity: + """Verify the adversarial test suite itself is complete and consistent.""" + + def test_total_count(self): + assert len(TESTS) == 30 + + def test_unique_ids(self): + ids = [t["id"] for t in TESTS] + assert len(ids) == len(set(ids)), "Duplicate test IDs found" + + def test_all_categories_present(self): + categories = {t["category"] for t in TESTS} + expected = {"wrong_legal_basis", "dark_pattern", "incomplete_document", + "similar_but_different", "homonym_different"} + assert categories == expected + + def test_category_counts(self): + counts = {} + for t in TESTS: + counts[t["category"]] = counts.get(t["category"], 0) + 1 + assert counts["wrong_legal_basis"] == 8 + assert counts["dark_pattern"] == 6 + assert counts["incomplete_document"] == 6 + assert counts["similar_but_different"] == 5 + assert counts["homonym_different"] == 5 + + def test_all_have_difficulty(self): + for t in TESTS: + assert "difficulty" in t, f"{t['id']} missing difficulty" + assert t["difficulty"] in ("easy", "medium", "hard") diff --git a/control-pipeline/tests/test_regression.py b/control-pipeline/tests/test_regression.py new file mode 100644 index 0000000..a32ea73 --- /dev/null +++ b/control-pipeline/tests/test_regression.py @@ -0,0 +1,196 @@ +""" +Regression Tests — verify pipeline updates don't break existing controls. + +Requires: DATABASE_URL environment variable for DB tests. +Tests without DB run always (structural checks). +""" + +import os +import sys +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + + +# ============================================================================ +# Structural tests (no DB needed) +# ============================================================================ + +class TestOntologyStability: + """Verify ontology constants haven't accidentally changed.""" + + def test_action_types_count(self): + from services.control_ontology import ACTION_TYPES + assert len(ACTION_TYPES) >= 26, f"ACTION_TYPES shrank to {len(ACTION_TYPES)}" + + def test_phase_order_count(self): + from services.control_ontology import PHASE_ORDER + assert len(PHASE_ORDER) >= 15, f"PHASE_ORDER shrank to {len(PHASE_ORDER)}" + + def test_key_action_types_exist(self): + from services.control_ontology import ACTION_TYPES + required = ["define", "implement", "monitor", "test", "prevent", "exclude", "train"] + for action in required: + assert action in ACTION_TYPES, f"Missing action_type: {action}" + + def test_classify_action_deterministic(self): + """Same input must always produce same output.""" + from services.control_ontology import classify_action + for _ in range(10): + assert classify_action("implementieren") == "implement" + assert classify_action("überwachen") == "monitor" + assert classify_action("verhindern") == "prevent" + + +class TestDependencyEngineStability: + """Verify dependency engine core functions haven't changed behavior.""" + + def test_evaluate_condition_empty(self): + from services.dependency_engine import evaluate_condition + assert evaluate_condition({}, {}) is True + + def test_evaluate_condition_simple(self): + from services.dependency_engine import evaluate_condition + cond = {"field": "source.status", "op": "==", "value": "pass"} + assert evaluate_condition(cond, {"source": {"status": "pass"}}) is True + assert evaluate_condition(cond, {"source": {"status": "fail"}}) is False + + def test_apply_effect_not_applicable(self): + from services.dependency_engine import apply_effect + assert apply_effect({"set_status": "not_applicable"}, "fail") == "not_applicable" + + def test_default_priorities_unchanged(self): + from services.dependency_engine import DEFAULT_PRIORITIES + assert DEFAULT_PRIORITIES["supersedes"] == 10 + assert DEFAULT_PRIORITIES["scope_exclusion"] == 20 + assert DEFAULT_PRIORITIES["prerequisite"] == 50 + assert DEFAULT_PRIORITIES["compensating_control"] == 80 + + +class TestDocumentComplianceStability: + """Verify document compliance rules haven't changed.""" + + def test_basic_website_requires_impressum(self): + from services.document_scope_resolver import resolve_required_documents + result = resolve_required_documents({"has_website": True}) + docs = result.get("required_documents", []) + doc_types = [d["document_type"] if isinstance(d, dict) else d.document_type for d in docs] + assert "impressum" in doc_types + assert "privacy_policy" in doc_types + + +# ============================================================================ +# DB tests (require DATABASE_URL) +# ============================================================================ + +@pytest.mark.skipif( + not os.getenv("DATABASE_URL"), + reason="DATABASE_URL not set" +) +class TestControlCountStability: + """Draft count must stay within expected range.""" + + def test_draft_count_minimum(self, db_session): + from sqlalchemy import text + count = db_session.execute(text( + "SELECT COUNT(*) FROM compliance.canonical_controls " + "WHERE release_state = 'draft' AND decomposition_method = 'pass0b'" + )).scalar() + assert count > 140000, f"Draft count too low: {count} (expected >140k)" + + def test_draft_count_maximum(self, db_session): + from sqlalchemy import text + count = db_session.execute(text( + "SELECT COUNT(*) FROM compliance.canonical_controls " + "WHERE release_state = 'draft' AND decomposition_method = 'pass0b'" + )).scalar() + assert count < 200000, f"Draft count too high: {count} (expected <200k)" + + def test_no_null_titles(self, db_session): + from sqlalchemy import text + null_count = db_session.execute(text( + "SELECT COUNT(*) FROM compliance.canonical_controls " + "WHERE release_state = 'draft' AND decomposition_method = 'pass0b' " + "AND (title IS NULL OR title = '')" + )).scalar() + assert null_count == 0, f"{null_count} controls without title" + + def test_assertion_coverage(self, db_session): + from sqlalchemy import text + no_assertion = db_session.execute(text( + "SELECT COUNT(*) FROM compliance.canonical_controls " + "WHERE release_state = 'draft' AND decomposition_method = 'pass0b' " + "AND (generation_metadata->>'assertion' IS NULL " + " OR generation_metadata->>'assertion' = '')" + )).scalar() + total = db_session.execute(text( + "SELECT COUNT(*) FROM compliance.canonical_controls " + "WHERE release_state = 'draft' AND decomposition_method = 'pass0b'" + )).scalar() + coverage = (total - no_assertion) / max(total, 1) * 100 + assert coverage > 99, f"Assertion coverage only {coverage:.1f}% (expected >99%)" + + +@pytest.mark.skipif( + not os.getenv("DATABASE_URL"), + reason="DATABASE_URL not set" +) +class TestDependencyGraphStability: + """Dependency graph must be valid and within expected size.""" + + def test_dependency_count_minimum(self, db_session): + from sqlalchemy import text + count = db_session.execute(text( + "SELECT COUNT(*) FROM compliance.control_dependencies WHERE is_active = true" + )).scalar() + assert count > 10000, f"Too few dependencies: {count} (expected >10k)" + + def test_no_self_dependencies(self, db_session): + from sqlalchemy import text + self_deps = db_session.execute(text( + "SELECT COUNT(*) FROM compliance.control_dependencies " + "WHERE source_control_id = target_control_id AND is_active = true" + )).scalar() + assert self_deps == 0, f"{self_deps} self-referencing dependencies" + + def test_no_orphan_dependencies(self, db_session): + from sqlalchemy import text + orphans = db_session.execute(text(""" + SELECT COUNT(*) FROM compliance.control_dependencies d + WHERE d.is_active = true + AND NOT EXISTS ( + SELECT 1 FROM compliance.canonical_controls c + WHERE c.id = d.source_control_id AND c.release_state = 'draft' + ) + """)).scalar() + # Some orphans OK (pointing to deprecated/duplicate controls) + assert orphans < 1000, f"Too many orphan dependencies: {orphans}" + + +@pytest.mark.skipif( + not os.getenv("DATABASE_URL"), + reason="DATABASE_URL not set" +) +class TestQualityMetrics: + """Quality metrics must stay within target ranges.""" + + def test_duplicate_rate(self, db_session): + from sqlalchemy import text + total = db_session.execute(text( + "SELECT COUNT(DISTINCT generation_metadata->>'merge_group_hint') " + "FROM compliance.canonical_controls " + "WHERE release_state = 'draft' AND decomposition_method = 'pass0b' " + "AND generation_metadata->>'merge_group_hint' IS NOT NULL" + )).scalar() + dups = db_session.execute(text(""" + SELECT COUNT(*) FROM ( + SELECT generation_metadata->>'merge_group_hint', COUNT(*) + FROM compliance.canonical_controls + WHERE release_state = 'draft' AND decomposition_method = 'pass0b' + AND generation_metadata->>'merge_group_hint' IS NOT NULL + GROUP BY generation_metadata->>'merge_group_hint' + HAVING COUNT(*) > 1 + ) sub + """)).scalar() + rate = dups / max(total, 1) * 100 + assert rate < 5, f"Duplicate merge_key rate {rate:.1f}% exceeds 5% threshold"