feat(consent-tester): Phase E — self-improving CMP library

cmp_discovery_log.py:
- sqlite log at /data/cmp_discoveries.db: every LLM-discovered CMP
  pattern recorded with domain, strategy, value, sample text
- Auto-promote (user-chosen 'voll automatisch' mode): when LLM returns
  strategy=url AND extracted text >= 800 words, write a new module
  /data/auto_cmp/auto_<slug>.py with derived regex matcher + reconstruct
- record_discovery() called from dsi_discovery._try_llm_cascade on success

cmp_library/_registry.py:
- Loads both hand-written modules from services/cmp_library/ AND
  auto-promoted modules from /data/auto_cmp/ (CMP_AUTO_DIR env)
- Auto modules use importlib.util.spec_from_file_location, no package
  install needed; restart consent-tester to pick up new ones

dsi_discovery.py:
- _try_llm_cascade now calls record_discovery() on every successful
  LLM analysis (cached AND fresh)

main.py:
- GET /cmp-discoveries — admin endpoint listing all logged discoveries
- DELETE /cmp-discoveries/{id} — rollback (unlinks auto_*.py)

This closes the self-improving loop: first encounter with a new CMP fires
the LLM (cost) → discovery is auto-promoted → all future runs against the
same vendor pattern hit Phase B (Named CMP) at <50ms with no LLM call.
This commit is contained in:
Benjamin Admin
2026-05-16 23:09:23 +02:00
parent 2400aa6a9e
commit 5f2da1de88
4 changed files with 298 additions and 12 deletions
@@ -17,7 +17,9 @@ A consent-tester restart picks up new auto_*.py files automatically.
from __future__ import annotations
import importlib
import importlib.util
import logging
import os
import pkgutil
from pathlib import Path
from typing import Callable
@@ -27,12 +29,22 @@ logger = logging.getLogger(__name__)
# (cmp_name, url_pattern, reconstruct_fn)
Registry = list[tuple[str, "object", Callable[[dict], str]]]
# Phase E: persistent auto-promoted modules live in a writable volume
# (separate from the source tree so deploys do not wipe them).
AUTO_DIR = Path(os.getenv("CMP_AUTO_DIR", "/data/auto_cmp"))
def load_all() -> Registry:
"""Import every module in this package and collect MATCHER + reconstruct."""
import services.cmp_library as pkg # type: ignore[import-not-found]
"""Import every module in this package and from AUTO_DIR."""
registry: Registry = []
_load_from_package(registry)
_load_from_auto_dir(registry)
return registry
def _load_from_package(registry: Registry) -> None:
"""Import the hand-written modules in services/cmp_library/."""
import services.cmp_library as pkg # type: ignore[import-not-found]
pkg_path = Path(pkg.__file__).parent
for module_info in pkgutil.iter_modules([str(pkg_path)]):
name = module_info.name
@@ -40,16 +52,35 @@ def load_all() -> Registry:
continue
try:
module = importlib.import_module(f"services.cmp_library.{name}")
matcher = getattr(module, "MATCHER", None)
reconstruct = getattr(module, "reconstruct", None)
if matcher is None or not callable(reconstruct):
logger.warning(
"CMP module %s missing MATCHER or reconstruct() — skipped", name,
)
continue
registry.append((name, matcher, reconstruct))
logger.info("CMP loaded: %s", name)
_register(registry, name, module)
except Exception as e:
logger.warning("CMP module %s failed to load: %s", name, e)
return registry
def _load_from_auto_dir(registry: Registry) -> None:
"""Import auto-promoted modules from the runtime volume."""
if not AUTO_DIR.exists():
return
for path in sorted(AUTO_DIR.glob("auto_*.py")):
name = path.stem
try:
spec = importlib.util.spec_from_file_location(name, path)
if not spec or not spec.loader:
continue
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
_register(registry, name, module)
except Exception as e:
logger.warning("Auto CMP module %s failed to load: %s", name, e)
def _register(registry: Registry, name: str, module) -> None:
matcher = getattr(module, "MATCHER", None)
reconstruct = getattr(module, "reconstruct", None)
if matcher is None or not callable(reconstruct):
logger.warning(
"CMP module %s missing MATCHER or reconstruct() — skipped", name,
)
return
registry.append((name, matcher, reconstruct))
logger.info("CMP loaded: %s", name)