5f2da1de88
cmp_discovery_log.py:
- sqlite log at /data/cmp_discoveries.db: every LLM-discovered CMP
pattern recorded with domain, strategy, value, sample text
- Auto-promote (user-chosen 'voll automatisch' mode): when LLM returns
strategy=url AND extracted text >= 800 words, write a new module
/data/auto_cmp/auto_<slug>.py with derived regex matcher + reconstruct
- record_discovery() called from dsi_discovery._try_llm_cascade on success
cmp_library/_registry.py:
- Loads both hand-written modules from services/cmp_library/ AND
auto-promoted modules from /data/auto_cmp/ (CMP_AUTO_DIR env)
- Auto modules use importlib.util.spec_from_file_location, no package
install needed; restart consent-tester to pick up new ones
dsi_discovery.py:
- _try_llm_cascade now calls record_discovery() on every successful
LLM analysis (cached AND fresh)
main.py:
- GET /cmp-discoveries — admin endpoint listing all logged discoveries
- DELETE /cmp-discoveries/{id} — rollback (unlinks auto_*.py)
This closes the self-improving loop: first encounter with a new CMP fires
the LLM (cost) → discovery is auto-promoted → all future runs against the
same vendor pattern hit Phase B (Named CMP) at <50ms with no LLM call.
87 lines
3.0 KiB
Python
87 lines
3.0 KiB
Python
"""
|
|
CMP Library Registry — discovers and registers all CMP modules.
|
|
|
|
Each CMP module exports two module-level symbols:
|
|
- MATCHER: a compiled regex matching the JSON endpoint URL
|
|
- reconstruct(data: dict) -> str: builds the cookie-policy text from JSON
|
|
|
|
The registry auto-discovers:
|
|
1. Hand-written modules: epaas, onetrust, cookiebot, usercentrics,
|
|
didomi, trustarc
|
|
2. Auto-promoted modules: any file matching `auto_*.py` in this folder
|
|
(created by Phase E when an LLM successfully discovers a new pattern)
|
|
|
|
A consent-tester restart picks up new auto_*.py files automatically.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import importlib
|
|
import importlib.util
|
|
import logging
|
|
import os
|
|
import pkgutil
|
|
from pathlib import Path
|
|
from typing import Callable
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# (cmp_name, url_pattern, reconstruct_fn)
|
|
Registry = list[tuple[str, "object", Callable[[dict], str]]]
|
|
|
|
# Phase E: persistent auto-promoted modules live in a writable volume
|
|
# (separate from the source tree so deploys do not wipe them).
|
|
AUTO_DIR = Path(os.getenv("CMP_AUTO_DIR", "/data/auto_cmp"))
|
|
|
|
|
|
def load_all() -> Registry:
|
|
"""Import every module in this package and from AUTO_DIR."""
|
|
registry: Registry = []
|
|
_load_from_package(registry)
|
|
_load_from_auto_dir(registry)
|
|
return registry
|
|
|
|
|
|
def _load_from_package(registry: Registry) -> None:
|
|
"""Import the hand-written modules in services/cmp_library/."""
|
|
import services.cmp_library as pkg # type: ignore[import-not-found]
|
|
pkg_path = Path(pkg.__file__).parent
|
|
for module_info in pkgutil.iter_modules([str(pkg_path)]):
|
|
name = module_info.name
|
|
if name.startswith("_"):
|
|
continue
|
|
try:
|
|
module = importlib.import_module(f"services.cmp_library.{name}")
|
|
_register(registry, name, module)
|
|
except Exception as e:
|
|
logger.warning("CMP module %s failed to load: %s", name, e)
|
|
|
|
|
|
def _load_from_auto_dir(registry: Registry) -> None:
|
|
"""Import auto-promoted modules from the runtime volume."""
|
|
if not AUTO_DIR.exists():
|
|
return
|
|
for path in sorted(AUTO_DIR.glob("auto_*.py")):
|
|
name = path.stem
|
|
try:
|
|
spec = importlib.util.spec_from_file_location(name, path)
|
|
if not spec or not spec.loader:
|
|
continue
|
|
module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
_register(registry, name, module)
|
|
except Exception as e:
|
|
logger.warning("Auto CMP module %s failed to load: %s", name, e)
|
|
|
|
|
|
def _register(registry: Registry, name: str, module) -> None:
|
|
matcher = getattr(module, "MATCHER", None)
|
|
reconstruct = getattr(module, "reconstruct", None)
|
|
if matcher is None or not callable(reconstruct):
|
|
logger.warning(
|
|
"CMP module %s missing MATCHER or reconstruct() — skipped", name,
|
|
)
|
|
return
|
|
registry.append((name, matcher, reconstruct))
|
|
logger.info("CMP loaded: %s", name)
|