fix(cookie): gate non-COOKIE_POLICY controls out of the cookie-policy scan

The cookie agent loaded 100 controls, 11 of which have no COOKIE_POLICY in
applicable_artifacts -- Security/TOM/Audit (PROCESS) or Banner-behaviour
(BEHAVIOR) controls that produce nonsense findings against a cookie policy
(e.g. "TOMs not documented"). Add a cookie classification gate (analogous to the
DSE gate, keyed on COOKIE_POLICY, without the needs_review carve-out since the
artifact signal is decisive and the set is inventory-verified). Controls are
routed out, not deleted. Effect vs Opus-GT: FP 16->11, FN 179->159; the
remaining FN=159 over-rescue is a separate (judge/criteria) question, not routing.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-22 00:13:14 +02:00
parent 38a347a82a
commit 869e7aeb1e
3 changed files with 130 additions and 0 deletions
@@ -0,0 +1,78 @@
"""Applicability-Gate fuer den Cookie-Policy-Scan.
Schliesst Controls aus dem Cookie-Findings-Scan aus, die laut
`compliance.control_classification` NICHT gegen eine Cookie-Policy laufen
('COOKIE_POLICY' nicht in applicable_artifacts). Diese gehoeren zu einem
anderen Artefakt/Pruefer — Banner (BEHAVIOR/Playwright), Security/TOM/Audit
(PROCESS) — und erzeugen sonst Unsinn-Findings (z.B. 'TOMs nicht dokumentiert'
gegen eine Cookie-Richtlinie). Sie werden NICHT geloescht, sondern als
Routing-Liste zurueckgegeben.
Anders als das DSE-Gate OHNE needs_review-Ausnahme: das Artefakt-Signal ist
hier entscheidend und per Inventar (2026-06-21) belegt; die mis-scopeten 11
sind geprueft. Fail-safe: fehlt die Tabelle / DB nicht erreichbar -> leeres
Dict -> es wird NICHT gefiltert (kein stiller Recall-Verlust).
"""
from __future__ import annotations
import logging
import os
from typing import Any
logger = logging.getLogger(__name__)
async def load_cookie_gate(db_url: str = "") -> dict[str, dict[str, Any]]:
"""Liefert {control_id: meta} fuer Controls, die aus dem Cookie-Findings-
Scan auszuschliessen sind (kein COOKIE_POLICY-Artefakt). Leeres Dict =
kein Filter."""
dsn = (db_url or os.getenv("DATABASE_URL")
or os.getenv("COMPLIANCE_DATABASE_URL") or "")
if not dsn:
return {}
try:
import asyncpg
conn = await asyncpg.connect(dsn)
try:
rows = await conn.fetch(
"""SELECT control_id, obligation_type, check_intent,
applicable_artifacts
FROM compliance.control_classification
WHERE is_active
AND NOT ('COOKIE_POLICY' = ANY(applicable_artifacts))""")
finally:
await conn.close()
except Exception as e: # Tabelle fehlt / DB weg -> kein Filter
logger.info("cookie classification gate inaktiv: %s", str(e)[:90])
return {}
return {
r["control_id"]: {
"obligation_type": r["obligation_type"],
"check_intent": r["check_intent"],
"applicable_artifacts": list(r["applicable_artifacts"] or []),
}
for r in rows if r["control_id"]
}
def apply_gate(
controls: list[dict[str, Any]],
gate: dict[str, dict[str, Any]],
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
"""Teilt geladene Controls in (kept, routed_out).
kept: laufen normal durch den Cookie-Scan.
routed_out: aus dem Scan genommen (control_id + title + Klassifikations-
Metadaten fuer das Routing zu Banner/Security/Audit).
"""
kept: list[dict[str, Any]] = []
routed_out: list[dict[str, Any]] = []
for c in controls:
cid = c.get("control_id")
meta = gate.get(cid) if cid else None
if meta:
routed_out.append({"control_id": cid, "title": c.get("title"), **meta})
else:
kept.append(c)
return kept, routed_out
@@ -45,6 +45,15 @@ async def run_v3_pipeline(
controls = []
_normalize_criteria(controls)
controls, sector_dropped = _filter_sector(controls, business_scope)
# Artefakt-Gate: Controls ohne COOKIE_POLICY-Artefakt (Security/TOM/Audit,
# Banner) raus — sie gehoeren zu anderem Pruefer/Artefakt und erzeugen sonst
# Unsinn-Findings. Siehe _classification_gate.
routed_out: list[dict[str, Any]] = []
try:
from ._classification_gate import apply_gate, load_cookie_gate
controls, routed_out = apply_gate(controls, await load_cookie_gate(db_url))
except Exception as e:
logger.warning("cookie classification gate skipped: %s", e)
results: list[dict[str, Any]] = []
if controls:
try:
@@ -111,6 +120,7 @@ async def run_v3_pipeline(
"layer_0_boost_overrides": boost_overrides,
"total_mcs": len(results),
"sector_dropped": sector_dropped,
"artifact_gated": len(routed_out),
}
return results, telemetry