Files
breakpilot-compliance/consent-tester/services/cmp_library/_registry.py
T
Benjamin Admin 5f2da1de88 feat(consent-tester): Phase E — self-improving CMP library
cmp_discovery_log.py:
- sqlite log at /data/cmp_discoveries.db: every LLM-discovered CMP
  pattern recorded with domain, strategy, value, sample text
- Auto-promote (user-chosen 'voll automatisch' mode): when LLM returns
  strategy=url AND extracted text >= 800 words, write a new module
  /data/auto_cmp/auto_<slug>.py with derived regex matcher + reconstruct
- record_discovery() called from dsi_discovery._try_llm_cascade on success

cmp_library/_registry.py:
- Loads both hand-written modules from services/cmp_library/ AND
  auto-promoted modules from /data/auto_cmp/ (CMP_AUTO_DIR env)
- Auto modules use importlib.util.spec_from_file_location, no package
  install needed; restart consent-tester to pick up new ones

dsi_discovery.py:
- _try_llm_cascade now calls record_discovery() on every successful
  LLM analysis (cached AND fresh)

main.py:
- GET /cmp-discoveries — admin endpoint listing all logged discoveries
- DELETE /cmp-discoveries/{id} — rollback (unlinks auto_*.py)

This closes the self-improving loop: first encounter with a new CMP fires
the LLM (cost) → discovery is auto-promoted → all future runs against the
same vendor pattern hit Phase B (Named CMP) at <50ms with no LLM call.
2026-05-16 23:09:23 +02:00

87 lines
3.0 KiB
Python

"""
CMP Library Registry — discovers and registers all CMP modules.
Each CMP module exports two module-level symbols:
- MATCHER: a compiled regex matching the JSON endpoint URL
- reconstruct(data: dict) -> str: builds the cookie-policy text from JSON
The registry auto-discovers:
1. Hand-written modules: epaas, onetrust, cookiebot, usercentrics,
didomi, trustarc
2. Auto-promoted modules: any file matching `auto_*.py` in this folder
(created by Phase E when an LLM successfully discovers a new pattern)
A consent-tester restart picks up new auto_*.py files automatically.
"""
from __future__ import annotations
import importlib
import importlib.util
import logging
import os
import pkgutil
from pathlib import Path
from typing import Callable
logger = logging.getLogger(__name__)
# (cmp_name, url_pattern, reconstruct_fn)
Registry = list[tuple[str, "object", Callable[[dict], str]]]
# Phase E: persistent auto-promoted modules live in a writable volume
# (separate from the source tree so deploys do not wipe them).
AUTO_DIR = Path(os.getenv("CMP_AUTO_DIR", "/data/auto_cmp"))
def load_all() -> Registry:
"""Import every module in this package and from AUTO_DIR."""
registry: Registry = []
_load_from_package(registry)
_load_from_auto_dir(registry)
return registry
def _load_from_package(registry: Registry) -> None:
"""Import the hand-written modules in services/cmp_library/."""
import services.cmp_library as pkg # type: ignore[import-not-found]
pkg_path = Path(pkg.__file__).parent
for module_info in pkgutil.iter_modules([str(pkg_path)]):
name = module_info.name
if name.startswith("_"):
continue
try:
module = importlib.import_module(f"services.cmp_library.{name}")
_register(registry, name, module)
except Exception as e:
logger.warning("CMP module %s failed to load: %s", name, e)
def _load_from_auto_dir(registry: Registry) -> None:
"""Import auto-promoted modules from the runtime volume."""
if not AUTO_DIR.exists():
return
for path in sorted(AUTO_DIR.glob("auto_*.py")):
name = path.stem
try:
spec = importlib.util.spec_from_file_location(name, path)
if not spec or not spec.loader:
continue
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
_register(registry, name, module)
except Exception as e:
logger.warning("Auto CMP module %s failed to load: %s", name, e)
def _register(registry: Registry, name: str, module) -> None:
matcher = getattr(module, "MATCHER", None)
reconstruct = getattr(module, "reconstruct", None)
if matcher is None or not callable(reconstruct):
logger.warning(
"CMP module %s missing MATCHER or reconstruct() — skipped", name,
)
return
registry.append((name, matcher, reconstruct))
logger.info("CMP loaded: %s", name)