diff --git a/consent-tester/main.py b/consent-tester/main.py index 8e965cbf..dd66d008 100644 --- a/consent-tester/main.py +++ b/consent-tester/main.py @@ -344,3 +344,20 @@ async def dsi_discovery(req: DSIDiscoveryRequest): errors=result.errors, scanned_at=datetime.now(timezone.utc).isoformat(), ) + + +# ── Admin: CMP discoveries (Phase E) ──────────────────────────────── + +@app.get("/cmp-discoveries") +async def cmp_discoveries(limit: int = 200): + """List LLM-discovered CMP patterns (Phase E auto-promote log).""" + from services.cmp_discovery_log import list_discoveries + return {"discoveries": list_discoveries(limit=limit)} + + +@app.delete("/cmp-discoveries/{disc_id}") +async def cmp_discovery_delete(disc_id: int): + """Delete a discovery + its auto-promoted module (rollback).""" + from services.cmp_discovery_log import delete_discovery + ok = delete_discovery(disc_id) + return {"deleted": ok, "id": disc_id} diff --git a/consent-tester/services/cmp_discovery_log.py b/consent-tester/services/cmp_discovery_log.py new file mode 100644 index 00000000..6f6eaef8 --- /dev/null +++ b/consent-tester/services/cmp_discovery_log.py @@ -0,0 +1,226 @@ +""" +CMP Discovery Log + Auto-Promotion (Phase E). + +When the LLM cascade (Phase C+D) discovers how to extract a previously +unknown CMP, we want that knowledge to persist so the SAME LLM call never +has to fire again — even after a container restart. + +Two persistence layers: + +1. SQLite log at /data/cmp_discoveries.db — every successful LLM discovery + recorded with domain, strategy, suggested value, sample text. Visible via + admin endpoint GET /cmp-discoveries. + +2. Auto-promoted Python modules in /data/auto_cmp/ — when the LLM's hint is + a "url" strategy with a derivable URL pattern AND the reconstruction + yields >=800 words, we write a new file `auto_.py` with a regex + matcher + reconstruct() function (using generic walker). The CMP library + registry hot-loads these on next restart. + +User policy (from plan): voll automatisch — risk of LLM hallucinations is +accepted; we mitigate by requiring extracted_words >= 800 before promoting. +""" + +from __future__ import annotations + +import logging +import os +import re +import sqlite3 +from datetime import datetime, timezone +from pathlib import Path +from urllib.parse import urlparse + +logger = logging.getLogger(__name__) + + +DB_PATH = os.getenv("CMP_DISCOVERY_DB", "/data/cmp_discoveries.db") +AUTO_DIR = Path(DB_PATH).parent / "auto_cmp" +PROMOTION_MIN_WORDS = 800 + + +# ── DB schema ─────────────────────────────────────────────────────── + +def _ensure_db() -> None: + """Create the sqlite file + schema on first use.""" + Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True) + with sqlite3.connect(DB_PATH) as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS discoveries ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts TEXT NOT NULL, + domain TEXT NOT NULL, + llm_used TEXT, + strategy TEXT, + value TEXT, + reconstructed_words INTEGER, + sample_text TEXT, + promoted_to TEXT + ) + """) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_disc_domain ON discoveries(domain)" + ) + conn.commit() + + +# ── Record + auto-promote ─────────────────────────────────────────── + +def record_discovery( + domain: str, + llm_used: str, + strategy: str, + value: str, + extracted_text: str, +) -> str | None: + """Log a successful LLM discovery and (if eligible) auto-promote it to + a named CMP module in /data/auto_cmp/. + + Returns the path of the promoted module (or None if not promoted). + """ + try: + _ensure_db() + except Exception as e: + logger.warning("CMP DB init failed: %s", e) + return None + + word_count = len(extracted_text.split()) if extracted_text else 0 + promoted_path: str | None = None + + if word_count >= PROMOTION_MIN_WORDS: + try: + promoted_path = _promote(domain, strategy, value) + except Exception as e: + logger.warning("Auto-promote failed for %s: %s", domain, e) + + sample = (extracted_text or "")[:500] + try: + with sqlite3.connect(DB_PATH) as conn: + conn.execute( + "INSERT INTO discoveries " + "(ts, domain, llm_used, strategy, value, " + " reconstructed_words, sample_text, promoted_to) " + "VALUES (?,?,?,?,?,?,?,?)", + ( + datetime.now(timezone.utc).isoformat(), + domain, llm_used, strategy, value, + word_count, sample, promoted_path, + ), + ) + conn.commit() + except Exception as e: + logger.warning("CMP log insert failed: %s", e) + + return promoted_path + + +def _promote(domain: str, strategy: str, value: str) -> str | None: + """Write an auto_*.py file the registry will pick up on next restart.""" + if strategy != "url": + # Selector/text strategies are too page-specific for a global module + return None + + parsed = urlparse(value) + if not parsed.netloc: + return None + + AUTO_DIR.mkdir(parents=True, exist_ok=True) + slug = _slugify(domain) + target = AUTO_DIR / f"auto_{slug}.py" + + # Derive a fairly tight URL pattern: same netloc + same path-prefix up + # to the last "" or "" path segment. Be conservative. + pattern = _url_to_regex(value) + + target.write_text(_AUTO_TEMPLATE.format( + domain=domain, + source_url=value, + pattern=pattern.replace('"', '\\"'), + )) + logger.info("Auto-promoted CMP module for %s -> %s", domain, target) + return str(target) + + +_SLUG_RE = re.compile(r"[^a-z0-9]+") + + +def _slugify(s: str) -> str: + return _SLUG_RE.sub("_", s.lower()).strip("_") or "unknown" + + +_NUM_OR_HEX_SEG = re.compile(r"^[a-f0-9\-_~]{8,}$", re.I) + + +def _url_to_regex(url: str) -> str: + """Build a regex that matches the same JSON endpoint shape. + + Replaces variable-looking segments (hashes, UUIDs, version IDs) with + `[^/]+` so the pattern keeps matching when the CMP rotates its config. + Preserves stable segments (literal paths). + """ + p = urlparse(url) + parts = p.path.split("/") + generalised = [] + for seg in parts: + if not seg: + generalised.append(seg) + continue + # If segment contains language-locale (de_DE) we keep alphabetic + if _NUM_OR_HEX_SEG.match(seg) or any(c.isdigit() for c in seg): + generalised.append(r"[^/]+") + else: + generalised.append(re.escape(seg)) + pattern_path = "/".join(generalised) + host_escaped = re.escape(p.netloc) + return rf"{host_escaped}{pattern_path}(\?|$)" + + +_AUTO_TEMPLATE = '''"""Auto-generated CMP matcher for {domain} (Phase E discovery). + +Derived from LLM-suggested URL: {source_url} +Reconstruction uses the generic walker — safe default, may produce a less +polished output than a hand-written CMP module. +""" + +import re + +MATCHER = re.compile(r"{pattern}", re.I) + + +def reconstruct(d: dict) -> str: + from services.cmp_heuristic import reconstruct_generic + return reconstruct_generic(d) +''' + + +# ── Read API for admin endpoint ───────────────────────────────────── + +def list_discoveries(limit: int = 200) -> list[dict]: + """Return recent discoveries as a list of dicts for the admin endpoint.""" + _ensure_db() + with sqlite3.connect(DB_PATH) as conn: + conn.row_factory = sqlite3.Row + rows = conn.execute( + "SELECT * FROM discoveries ORDER BY id DESC LIMIT ?", (limit,), + ).fetchall() + return [dict(r) for r in rows] + + +def delete_discovery(disc_id: int) -> bool: + """Delete a discovery + its auto-promoted module (if any).""" + _ensure_db() + with sqlite3.connect(DB_PATH) as conn: + row = conn.execute( + "SELECT promoted_to FROM discoveries WHERE id=?", (disc_id,), + ).fetchone() + if not row: + return False + promoted = row[0] + conn.execute("DELETE FROM discoveries WHERE id=?", (disc_id,)) + conn.commit() + if promoted: + try: + Path(promoted).unlink(missing_ok=True) + except Exception as e: + logger.warning("Could not unlink %s: %s", promoted, e) + return True diff --git a/consent-tester/services/cmp_library/_registry.py b/consent-tester/services/cmp_library/_registry.py index 9ef6e944..deca1137 100644 --- a/consent-tester/services/cmp_library/_registry.py +++ b/consent-tester/services/cmp_library/_registry.py @@ -17,7 +17,9 @@ A consent-tester restart picks up new auto_*.py files automatically. from __future__ import annotations import importlib +import importlib.util import logging +import os import pkgutil from pathlib import Path from typing import Callable @@ -27,12 +29,22 @@ logger = logging.getLogger(__name__) # (cmp_name, url_pattern, reconstruct_fn) Registry = list[tuple[str, "object", Callable[[dict], str]]] +# Phase E: persistent auto-promoted modules live in a writable volume +# (separate from the source tree so deploys do not wipe them). +AUTO_DIR = Path(os.getenv("CMP_AUTO_DIR", "/data/auto_cmp")) + def load_all() -> Registry: - """Import every module in this package and collect MATCHER + reconstruct.""" - import services.cmp_library as pkg # type: ignore[import-not-found] + """Import every module in this package and from AUTO_DIR.""" registry: Registry = [] + _load_from_package(registry) + _load_from_auto_dir(registry) + return registry + +def _load_from_package(registry: Registry) -> None: + """Import the hand-written modules in services/cmp_library/.""" + import services.cmp_library as pkg # type: ignore[import-not-found] pkg_path = Path(pkg.__file__).parent for module_info in pkgutil.iter_modules([str(pkg_path)]): name = module_info.name @@ -40,16 +52,35 @@ def load_all() -> Registry: continue try: module = importlib.import_module(f"services.cmp_library.{name}") - matcher = getattr(module, "MATCHER", None) - reconstruct = getattr(module, "reconstruct", None) - if matcher is None or not callable(reconstruct): - logger.warning( - "CMP module %s missing MATCHER or reconstruct() — skipped", name, - ) - continue - registry.append((name, matcher, reconstruct)) - logger.info("CMP loaded: %s", name) + _register(registry, name, module) except Exception as e: logger.warning("CMP module %s failed to load: %s", name, e) - return registry + +def _load_from_auto_dir(registry: Registry) -> None: + """Import auto-promoted modules from the runtime volume.""" + if not AUTO_DIR.exists(): + return + for path in sorted(AUTO_DIR.glob("auto_*.py")): + name = path.stem + try: + spec = importlib.util.spec_from_file_location(name, path) + if not spec or not spec.loader: + continue + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + _register(registry, name, module) + except Exception as e: + logger.warning("Auto CMP module %s failed to load: %s", name, e) + + +def _register(registry: Registry, name: str, module) -> None: + matcher = getattr(module, "MATCHER", None) + reconstruct = getattr(module, "reconstruct", None) + if matcher is None or not callable(reconstruct): + logger.warning( + "CMP module %s missing MATCHER or reconstruct() — skipped", name, + ) + return + registry.append((name, matcher, reconstruct)) + logger.info("CMP loaded: %s", name) diff --git a/consent-tester/services/dsi_discovery.py b/consent-tester/services/dsi_discovery.py index dffb87c9..24c1134c 100644 --- a/consent-tester/services/dsi_discovery.py +++ b/consent-tester/services/dsi_discovery.py @@ -836,6 +836,18 @@ async def _try_llm_cascade( if wc >= 300: await cache_set(netloc, hint) logger.info("LLM cached for %s (%s): %d words", netloc, hint.get("_tier"), wc) + # Phase E: log discovery + (if eligible) auto-promote to named CMP + try: + from services.cmp_discovery_log import record_discovery + record_discovery( + domain=netloc, + llm_used=hint.get("_tier", "unknown"), + strategy=hint.get("strategy", ""), + value=hint.get("value", ""), + extracted_text=text, + ) + except Exception as e: + logger.debug("CMP discovery log failed: %s", e) return text, wc