From 869e7aeb1e0ef7819f2adfdf76552fd208c2c180 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Mon, 22 Jun 2026 00:13:14 +0200 Subject: [PATCH] fix(cookie): gate non-COOKIE_POLICY controls out of the cookie-policy scan The cookie agent loaded 100 controls, 11 of which have no COOKIE_POLICY in applicable_artifacts -- Security/TOM/Audit (PROCESS) or Banner-behaviour (BEHAVIOR) controls that produce nonsense findings against a cookie policy (e.g. "TOMs not documented"). Add a cookie classification gate (analogous to the DSE gate, keyed on COOKIE_POLICY, without the needs_review carve-out since the artifact signal is decisive and the set is inventory-verified). Controls are routed out, not deleted. Effect vs Opus-GT: FP 16->11, FN 179->159; the remaining FN=159 over-rescue is a separate (judge/criteria) question, not routing. Co-Authored-By: Claude Opus 4.7 --- .../cookie_policy/_classification_gate.py | 78 +++++++++++++++++++ .../cookie_policy/v3_engine.py | 10 +++ .../tests/test_cookie_classification_gate.py | 42 ++++++++++ 3 files changed, 130 insertions(+) create mode 100644 backend-compliance/compliance/services/specialist_agents/cookie_policy/_classification_gate.py create mode 100644 backend-compliance/tests/test_cookie_classification_gate.py diff --git a/backend-compliance/compliance/services/specialist_agents/cookie_policy/_classification_gate.py b/backend-compliance/compliance/services/specialist_agents/cookie_policy/_classification_gate.py new file mode 100644 index 00000000..760846e8 --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/cookie_policy/_classification_gate.py @@ -0,0 +1,78 @@ +"""Applicability-Gate fuer den Cookie-Policy-Scan. + +Schliesst Controls aus dem Cookie-Findings-Scan aus, die laut +`compliance.control_classification` NICHT gegen eine Cookie-Policy laufen +('COOKIE_POLICY' nicht in applicable_artifacts). Diese gehoeren zu einem +anderen Artefakt/Pruefer — Banner (BEHAVIOR/Playwright), Security/TOM/Audit +(PROCESS) — und erzeugen sonst Unsinn-Findings (z.B. 'TOMs nicht dokumentiert' +gegen eine Cookie-Richtlinie). Sie werden NICHT geloescht, sondern als +Routing-Liste zurueckgegeben. + +Anders als das DSE-Gate OHNE needs_review-Ausnahme: das Artefakt-Signal ist +hier entscheidend und per Inventar (2026-06-21) belegt; die mis-scopeten 11 +sind geprueft. Fail-safe: fehlt die Tabelle / DB nicht erreichbar -> leeres +Dict -> es wird NICHT gefiltert (kein stiller Recall-Verlust). +""" + +from __future__ import annotations + +import logging +import os +from typing import Any + +logger = logging.getLogger(__name__) + + +async def load_cookie_gate(db_url: str = "") -> dict[str, dict[str, Any]]: + """Liefert {control_id: meta} fuer Controls, die aus dem Cookie-Findings- + Scan auszuschliessen sind (kein COOKIE_POLICY-Artefakt). Leeres Dict = + kein Filter.""" + dsn = (db_url or os.getenv("DATABASE_URL") + or os.getenv("COMPLIANCE_DATABASE_URL") or "") + if not dsn: + return {} + try: + import asyncpg + conn = await asyncpg.connect(dsn) + try: + rows = await conn.fetch( + """SELECT control_id, obligation_type, check_intent, + applicable_artifacts + FROM compliance.control_classification + WHERE is_active + AND NOT ('COOKIE_POLICY' = ANY(applicable_artifacts))""") + finally: + await conn.close() + except Exception as e: # Tabelle fehlt / DB weg -> kein Filter + logger.info("cookie classification gate inaktiv: %s", str(e)[:90]) + return {} + return { + r["control_id"]: { + "obligation_type": r["obligation_type"], + "check_intent": r["check_intent"], + "applicable_artifacts": list(r["applicable_artifacts"] or []), + } + for r in rows if r["control_id"] + } + + +def apply_gate( + controls: list[dict[str, Any]], + gate: dict[str, dict[str, Any]], +) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + """Teilt geladene Controls in (kept, routed_out). + + kept: laufen normal durch den Cookie-Scan. + routed_out: aus dem Scan genommen (control_id + title + Klassifikations- + Metadaten fuer das Routing zu Banner/Security/Audit). + """ + kept: list[dict[str, Any]] = [] + routed_out: list[dict[str, Any]] = [] + for c in controls: + cid = c.get("control_id") + meta = gate.get(cid) if cid else None + if meta: + routed_out.append({"control_id": cid, "title": c.get("title"), **meta}) + else: + kept.append(c) + return kept, routed_out diff --git a/backend-compliance/compliance/services/specialist_agents/cookie_policy/v3_engine.py b/backend-compliance/compliance/services/specialist_agents/cookie_policy/v3_engine.py index d38c1b33..b60d1118 100644 --- a/backend-compliance/compliance/services/specialist_agents/cookie_policy/v3_engine.py +++ b/backend-compliance/compliance/services/specialist_agents/cookie_policy/v3_engine.py @@ -45,6 +45,15 @@ async def run_v3_pipeline( controls = [] _normalize_criteria(controls) controls, sector_dropped = _filter_sector(controls, business_scope) + # Artefakt-Gate: Controls ohne COOKIE_POLICY-Artefakt (Security/TOM/Audit, + # Banner) raus — sie gehoeren zu anderem Pruefer/Artefakt und erzeugen sonst + # Unsinn-Findings. Siehe _classification_gate. + routed_out: list[dict[str, Any]] = [] + try: + from ._classification_gate import apply_gate, load_cookie_gate + controls, routed_out = apply_gate(controls, await load_cookie_gate(db_url)) + except Exception as e: + logger.warning("cookie classification gate skipped: %s", e) results: list[dict[str, Any]] = [] if controls: try: @@ -111,6 +120,7 @@ async def run_v3_pipeline( "layer_0_boost_overrides": boost_overrides, "total_mcs": len(results), "sector_dropped": sector_dropped, + "artifact_gated": len(routed_out), } return results, telemetry diff --git a/backend-compliance/tests/test_cookie_classification_gate.py b/backend-compliance/tests/test_cookie_classification_gate.py new file mode 100644 index 00000000..efa8dc86 --- /dev/null +++ b/backend-compliance/tests/test_cookie_classification_gate.py @@ -0,0 +1,42 @@ +"""Tests for the cookie-policy applicability gate: controls without a +COOKIE_POLICY artifact are routed out of the findings scan (not deleted), +and the gate is fail-safe (no DSN -> no filter).""" +import pytest + +from compliance.services.specialist_agents.cookie_policy._classification_gate import ( + apply_gate, load_cookie_gate, +) + + +def test_apply_gate_splits_kept_and_routed(): + controls = [ + {"control_id": "COOK-1", "title": "Kategorien"}, + {"control_id": "TOM-1", "title": "Verschlüsselung"}, + {"control_id": "BAN-1", "title": "Consent vor Setzen"}, + ] + gate = { + "TOM-1": {"obligation_type": "TECHNICAL", "check_intent": "DIRECT_TECHNICAL", + "applicable_artifacts": ["TOM", "AUDIT"]}, + "BAN-1": {"obligation_type": "TECHNICAL", "check_intent": "DIRECT_TECHNICAL", + "applicable_artifacts": ["COOKIE_BANNER", "SYSTEMSCAN"]}, + } + kept, routed = apply_gate(controls, gate) + assert [c["control_id"] for c in kept] == ["COOK-1"] + assert {c["control_id"] for c in routed} == {"TOM-1", "BAN-1"} + # routed entries carry title + classification metadata for downstream routing + tom = next(c for c in routed if c["control_id"] == "TOM-1") + assert tom["title"] == "Verschlüsselung" + assert tom["applicable_artifacts"] == ["TOM", "AUDIT"] + + +def test_apply_gate_empty_gate_keeps_all(): + controls = [{"control_id": "A"}, {"control_id": "B"}] + kept, routed = apply_gate(controls, {}) + assert len(kept) == 2 and routed == [] + + +@pytest.mark.asyncio +async def test_load_cookie_gate_no_dsn_is_failsafe(monkeypatch): + monkeypatch.delenv("DATABASE_URL", raising=False) + monkeypatch.delenv("COMPLIANCE_DATABASE_URL", raising=False) + assert await load_cookie_gate("") == {}