Files
Benjamin Admin 5f2da1de88 feat(consent-tester): Phase E — self-improving CMP library
cmp_discovery_log.py:
- sqlite log at /data/cmp_discoveries.db: every LLM-discovered CMP
  pattern recorded with domain, strategy, value, sample text
- Auto-promote (user-chosen 'voll automatisch' mode): when LLM returns
  strategy=url AND extracted text >= 800 words, write a new module
  /data/auto_cmp/auto_<slug>.py with derived regex matcher + reconstruct
- record_discovery() called from dsi_discovery._try_llm_cascade on success

cmp_library/_registry.py:
- Loads both hand-written modules from services/cmp_library/ AND
  auto-promoted modules from /data/auto_cmp/ (CMP_AUTO_DIR env)
- Auto modules use importlib.util.spec_from_file_location, no package
  install needed; restart consent-tester to pick up new ones

dsi_discovery.py:
- _try_llm_cascade now calls record_discovery() on every successful
  LLM analysis (cached AND fresh)

main.py:
- GET /cmp-discoveries — admin endpoint listing all logged discoveries
- DELETE /cmp-discoveries/{id} — rollback (unlinks auto_*.py)

This closes the self-improving loop: first encounter with a new CMP fires
the LLM (cost) → discovery is auto-promoted → all future runs against the
same vendor pattern hit Phase B (Named CMP) at <50ms with no LLM call.
2026-05-16 23:09:23 +02:00

227 lines
7.3 KiB
Python

"""
CMP Discovery Log + Auto-Promotion (Phase E).
When the LLM cascade (Phase C+D) discovers how to extract a previously
unknown CMP, we want that knowledge to persist so the SAME LLM call never
has to fire again — even after a container restart.
Two persistence layers:
1. SQLite log at /data/cmp_discoveries.db — every successful LLM discovery
recorded with domain, strategy, suggested value, sample text. Visible via
admin endpoint GET /cmp-discoveries.
2. Auto-promoted Python modules in /data/auto_cmp/ — when the LLM's hint is
a "url" strategy with a derivable URL pattern AND the reconstruction
yields >=800 words, we write a new file `auto_<slug>.py` with a regex
matcher + reconstruct() function (using generic walker). The CMP library
registry hot-loads these on next restart.
User policy (from plan): voll automatisch — risk of LLM hallucinations is
accepted; we mitigate by requiring extracted_words >= 800 before promoting.
"""
from __future__ import annotations
import logging
import os
import re
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urlparse
logger = logging.getLogger(__name__)
DB_PATH = os.getenv("CMP_DISCOVERY_DB", "/data/cmp_discoveries.db")
AUTO_DIR = Path(DB_PATH).parent / "auto_cmp"
PROMOTION_MIN_WORDS = 800
# ── DB schema ───────────────────────────────────────────────────────
def _ensure_db() -> None:
"""Create the sqlite file + schema on first use."""
Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(DB_PATH) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS discoveries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts TEXT NOT NULL,
domain TEXT NOT NULL,
llm_used TEXT,
strategy TEXT,
value TEXT,
reconstructed_words INTEGER,
sample_text TEXT,
promoted_to TEXT
)
""")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_disc_domain ON discoveries(domain)"
)
conn.commit()
# ── Record + auto-promote ───────────────────────────────────────────
def record_discovery(
domain: str,
llm_used: str,
strategy: str,
value: str,
extracted_text: str,
) -> str | None:
"""Log a successful LLM discovery and (if eligible) auto-promote it to
a named CMP module in /data/auto_cmp/.
Returns the path of the promoted module (or None if not promoted).
"""
try:
_ensure_db()
except Exception as e:
logger.warning("CMP DB init failed: %s", e)
return None
word_count = len(extracted_text.split()) if extracted_text else 0
promoted_path: str | None = None
if word_count >= PROMOTION_MIN_WORDS:
try:
promoted_path = _promote(domain, strategy, value)
except Exception as e:
logger.warning("Auto-promote failed for %s: %s", domain, e)
sample = (extracted_text or "")[:500]
try:
with sqlite3.connect(DB_PATH) as conn:
conn.execute(
"INSERT INTO discoveries "
"(ts, domain, llm_used, strategy, value, "
" reconstructed_words, sample_text, promoted_to) "
"VALUES (?,?,?,?,?,?,?,?)",
(
datetime.now(timezone.utc).isoformat(),
domain, llm_used, strategy, value,
word_count, sample, promoted_path,
),
)
conn.commit()
except Exception as e:
logger.warning("CMP log insert failed: %s", e)
return promoted_path
def _promote(domain: str, strategy: str, value: str) -> str | None:
"""Write an auto_*.py file the registry will pick up on next restart."""
if strategy != "url":
# Selector/text strategies are too page-specific for a global module
return None
parsed = urlparse(value)
if not parsed.netloc:
return None
AUTO_DIR.mkdir(parents=True, exist_ok=True)
slug = _slugify(domain)
target = AUTO_DIR / f"auto_{slug}.py"
# Derive a fairly tight URL pattern: same netloc + same path-prefix up
# to the last "<id>" or "<config>" path segment. Be conservative.
pattern = _url_to_regex(value)
target.write_text(_AUTO_TEMPLATE.format(
domain=domain,
source_url=value,
pattern=pattern.replace('"', '\\"'),
))
logger.info("Auto-promoted CMP module for %s -> %s", domain, target)
return str(target)
_SLUG_RE = re.compile(r"[^a-z0-9]+")
def _slugify(s: str) -> str:
return _SLUG_RE.sub("_", s.lower()).strip("_") or "unknown"
_NUM_OR_HEX_SEG = re.compile(r"^[a-f0-9\-_~]{8,}$", re.I)
def _url_to_regex(url: str) -> str:
"""Build a regex that matches the same JSON endpoint shape.
Replaces variable-looking segments (hashes, UUIDs, version IDs) with
`[^/]+` so the pattern keeps matching when the CMP rotates its config.
Preserves stable segments (literal paths).
"""
p = urlparse(url)
parts = p.path.split("/")
generalised = []
for seg in parts:
if not seg:
generalised.append(seg)
continue
# If segment contains language-locale (de_DE) we keep alphabetic
if _NUM_OR_HEX_SEG.match(seg) or any(c.isdigit() for c in seg):
generalised.append(r"[^/]+")
else:
generalised.append(re.escape(seg))
pattern_path = "/".join(generalised)
host_escaped = re.escape(p.netloc)
return rf"{host_escaped}{pattern_path}(\?|$)"
_AUTO_TEMPLATE = '''"""Auto-generated CMP matcher for {domain} (Phase E discovery).
Derived from LLM-suggested URL: {source_url}
Reconstruction uses the generic walker — safe default, may produce a less
polished output than a hand-written CMP module.
"""
import re
MATCHER = re.compile(r"{pattern}", re.I)
def reconstruct(d: dict) -> str:
from services.cmp_heuristic import reconstruct_generic
return reconstruct_generic(d)
'''
# ── Read API for admin endpoint ─────────────────────────────────────
def list_discoveries(limit: int = 200) -> list[dict]:
"""Return recent discoveries as a list of dicts for the admin endpoint."""
_ensure_db()
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT * FROM discoveries ORDER BY id DESC LIMIT ?", (limit,),
).fetchall()
return [dict(r) for r in rows]
def delete_discovery(disc_id: int) -> bool:
"""Delete a discovery + its auto-promoted module (if any)."""
_ensure_db()
with sqlite3.connect(DB_PATH) as conn:
row = conn.execute(
"SELECT promoted_to FROM discoveries WHERE id=?", (disc_id,),
).fetchone()
if not row:
return False
promoted = row[0]
conn.execute("DELETE FROM discoveries WHERE id=?", (disc_id,))
conn.commit()
if promoted:
try:
Path(promoted).unlink(missing_ok=True)
except Exception as e:
logger.warning("Could not unlink %s: %s", promoted, e)
return True