feat(control-pipeline): incremental dedup + ENISA CRA ingestion
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-consent (push) Successful in 43s
CI / test-python-voice (push) Successful in 33s
CI / test-bqas (push) Successful in 37s

BatchDedup since-Parameter (services/batch_dedup_runner.py + api):
- Neuer 'since: datetime' Param scoped Phase 1 + Phase 2 SQL auf created_at >= since.
- Phase 2 checkpoint wird beim scoped Lauf geloescht (verhindert Skip neuer Atomics
  deren control_id alphabetisch unter dem stale last_id liegt).
- 6-13x schneller fuer nachgeschobene Dokumente (19k statt 172k Atomics).
- Doku: control-pipeline/docs/incremental-dedup.md.

Neue Scripts:
- gpre1_object_groups_incremental.py: Append neuer Objects an object_groups via
  bge-m3 nearest-neighbor (threshold default 0.85, empfehlbar 0.78 fuer breiteres
  Synonym-Matching). Pure INSERT/UPDATE, kein DELETE.
- gpre2_master_controls_incremental.py: Non-destructive Master-Controls-Update.
  Existing MCs unangetastet (UUIDs + master_control_id bleiben), nur neue Members
  appended + neue MCs fuer Object-Groups die jetzt min-phases erreichen.
- ingest_enisa_cra.py: Ingestion der 8 CRA-relevanten ENISA-Dokumente
  (Standards Mapping, EUCC-Implementation, NIS2 TIG, SRP FAQ, EUCC Eval Methodology,
  CVD Policies, Threat Landscape 2025). chunk_strategy=legal,
  requirement_strength=guidance|consultation_draft|evidentiary.

Quelldaten: legal-sources/enisa/enisa_cra_single_reporting_platform_faq.html
(PDFs sind .gitignore-gefiltert).

Ergebnis dieser Pipeline-Iteration:
- 1.296 neue CRA-Controls + 19.652 atomare Children
- +362 neue Master-Controls, 10.017 existing erweitert
- Total: 13.950 MCs, 620 CRA-MCs (vorher 566), 1.304 CRA-Atomics (vorher 841)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-18 18:21:46 +02:00
parent 47d7beeb52
commit 9783657da3
7 changed files with 1895 additions and 15 deletions
@@ -1553,6 +1553,7 @@ async def get_repair_backfill_status(backfill_id: str):
class BatchDedupRequest(BaseModel):
dry_run: bool = True
hint_filter: Optional[str] = None # Only process groups matching this hint prefix
since: Optional[str] = None # ISO datetime — scope to controls created at/after this
_batch_dedup_status: dict = {}
@@ -1567,7 +1568,15 @@ async def _run_batch_dedup(req: BatchDedupRequest, dedup_id: str):
runner = BatchDedupRunner(db)
_batch_dedup_status[dedup_id] = {"status": "running", "phase": "starting"}
stats = await runner.run(dry_run=req.dry_run, hint_filter=req.hint_filter)
since_dt = None
if req.since:
from datetime import datetime
since_dt = datetime.fromisoformat(req.since.replace("Z", "+00:00"))
stats = await runner.run(
dry_run=req.dry_run,
hint_filter=req.hint_filter,
since=since_dt,
)
_batch_dedup_status[dedup_id] = {
"status": "completed",
+101
View File
@@ -0,0 +1,101 @@
# Incremental BatchDedup für nachgeschobene Dokumente
Eingefuehrt am 2026-05-18. Pattern fuer alle zukuenftigen Einzeldokument-Ingestionen.
## Problem
Der Default-BatchDedup-Runner lief gegen ALLE `pass0b` Atomics ohne Filter
(WHERE decomposition_method = 'pass0b' AND release_state NOT IN ('deprecated','duplicate')).
Das sind bei uns ~172k Controls. Pace ~5k/h → 25-40h Laufzeit. Bei jedem
hinzugefuegten Dokument der gleiche volle Lauf — auch wenn das neue Dokument
nur 1-2k Atomics erzeugt.
Zusaetzliches Risiko: Phase 1 schreibt master_controls erst am Ende. Ein
Container-Crash mitten im Lauf (z.B. via Qdrant-Timeout) verwirft 100%
des In-Memory-Fortschritts.
## Loesung — `since` Parameter
`POST /v1/canonical/generate/batch-dedup` akzeptiert jetzt:
```json
{
"dry_run": false,
"since": "2026-05-18T02:53:00+00:00"
}
```
Effekt:
- Phase 1 (intra-group dedup) laedt nur Controls mit `created_at >= since`
- Phase 2 (cross-group dedup) filtert ebenfalls auf `created_at >= since`
- Phase 2 Checkpoint wird vor Lauf-Start geloescht (sonst skippt stale
`last_control_id` neu erzeugte Atomics deren control_id alphabetisch
davor liegt)
Phase 2 sucht weiter im **vollen** Qdrant-Index `atomic_controls_dedup`,
findet also Matches zu alten Master Controls und verlinkt korrekt.
## Wann verwenden
| Szenario | Empfehlung |
|---|---|
| Einzelnes neues Dokument ingestiert + Pass 0a + Pass 0b durchgelaufen | `since` setzen auf Zeitpunkt vor Pass 0b |
| Mehrere kleine Updates seit letztem Full-Dedup | `since` setzen auf Zeitpunkt nach letztem Full-Dedup |
| Initial-Setup oder Pipeline-Major-Update | KEIN `since` — full run |
| Verdacht auf Drift / Quality-Regression | KEIN `since` — full run |
## Workflow nach Einzeldokument-Ingestion
```bash
# 1. Pass 0a auf neue Controls (Obligations extrahieren)
curl -X POST .../v1/canonical/generate/run-pass0a -d '{...}'
# 2. Pass 0b Decomposition Submit (Atomics erzeugen)
curl -X POST .../v1/canonical/generate/submit-pass0b -d '{...}'
# 3. Wenn Anthropic Batch durch: process-batch
curl -X POST .../v1/canonical/generate/process-batch -d '{
"batch_id": "msgbatch_...",
"pass_type": "0b"
}'
# 4. Inkrementell deduppen (NEU, statt 25h full run)
curl -X POST .../v1/canonical/generate/batch-dedup -d '{
"dry_run": false,
"since": "<ISO-Datetime kurz vor Pass-0b-Start>"
}'
```
## Pace-Beobachtung (CRA-Lauf 2026-05-18)
- Total neue Atomics: 19.423
- Phase 1 multi-groups: 568 (Rest 18.101 sind Singletons → direkt Master)
- Phase 2 Cross-Group: ~3-4h erwartet
- Vergleich: Full-Run waere 25-40h gewesen, scoped 6-13x schneller.
## Implementation-Details (fuer Wartung)
Geaenderte Dateien:
- `services/batch_dedup_runner.py``run()` + `_load_merge_groups()` +
`_run_cross_group_pass()` SQL-Queries
- `api/control_generator_routes.py``BatchDedupRequest.since` Feld +
Handler reicht durch
Backwards-kompatibel: ohne `since` aequivalent zum alten Verhalten.
## Bekannte Limits
1. **Phase 2 Checkpoint wird beim scoped Lauf geloescht.** Wenn waehrend
eines `since`-Laufs ein voller Run dazwischen geschoben werden soll
(sollte nicht passieren), muss neu starten.
2. **Phase 1 commit-Granularitaet nicht angefasst.** Bei Crash mitten in
Phase 1 ohne `since` bleibt der Verlust gleich. Aber: scoped Phase 1
ist so kurz (Minuten), dass das praktisch egal ist.
3. **Singleton-Atomics werden direkt Master ohne Cross-Check.** Wenn ein
neues Singleton-Atomic semantisch identisch zu einem alten Master
ist, faengt das nur Phase 2 (via Qdrant). Funktioniert solange Phase 2
nicht uebersprungen wird (dry_run=false ist Pflicht).
## Memory-Eintrag
Siehe `~/.claude/projects/-Users-benjaminadmin-Projekte-breakpilot-core/memory/feedback_incremental_dedup.md`
@@ -0,0 +1,203 @@
#!/usr/bin/env python3
"""
G-pre1 INCREMENTAL: Append new objects to object_groups via embedding similarity.
Non-destructive alternative to gpre1_object_clustering.py (which DELETEs and
rebuilds all groups via K-Means). This script:
- Finds objects referenced in atomic controls that are NOT yet in
object_groups.members
- Embeds each unmatched object via bge-m3 (local embedding-service)
- Nearest-neighbor search against existing object_groups.canonical_name
- Cosine >= --threshold (default 0.85) APPEND to existing group's members
- Cosine < --threshold CREATE new object_group with next free group_id
Existing groups stay; only members get appended and new groups get added.
Usage (inside control-pipeline container):
python3 /app/scripts/gpre1_object_groups_incremental.py --since 2026-05-18T02:53:00+00:00 --dry-run
python3 /app/scripts/gpre1_object_groups_incremental.py --since 2026-05-18T02:53:00+00:00
python3 /app/scripts/gpre1_object_groups_incremental.py --since 2026-05-18T02:53:00+00:00 --threshold 0.82
"""
import argparse
import json
import logging
import os
from datetime import datetime
import httpx
import numpy as np
from sqlalchemy import create_engine, text
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("gpre1_inc")
DB_URL = os.getenv("DATABASE_URL", "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db")
EMBEDDING_URL = os.getenv("EMBEDDING_URL", "http://embedding-service:8087")
BATCH_SIZE = 64
def embed_batch(texts: list[str]) -> np.ndarray:
"""Embed a list of strings via bge-m3 embedding-service."""
with httpx.Client(timeout=120.0) as c:
resp = c.post(f"{EMBEDDING_URL}/embed", json={"texts": texts, "normalize": True})
resp.raise_for_status()
return np.array(resp.json()["embeddings"], dtype=np.float32)
def embed_many(texts: list[str], label: str = "") -> np.ndarray:
"""Embed many strings in batches."""
n = len(texts)
out = np.zeros((n, 1024), dtype=np.float32)
for i in range(0, n, BATCH_SIZE):
batch = texts[i:i + BATCH_SIZE]
out[i:i + len(batch)] = embed_batch(batch)
if (i // BATCH_SIZE) % 20 == 0:
logger.info(" %s: %d/%d embedded", label, i + len(batch), n)
return out
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--since", required=True, help="ISO datetime — consider atomics from this date onwards")
parser.add_argument("--threshold", type=float, default=0.85,
help="Cosine threshold for appending to existing group (default 0.85)")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
since_dt = datetime.fromisoformat(args.since.replace("Z", "+00:00"))
logger.info("Incremental object_groups update since %s, threshold=%.2f, dry_run=%s",
since_dt.isoformat(), args.threshold, args.dry_run)
engine = create_engine(DB_URL, connect_args={"options": "-c search_path=compliance,public"})
# 1. Load existing object_groups (id, canonical_name, members)
with engine.connect() as c:
rows = c.execute(text("""
SELECT group_id, canonical_name, members FROM object_groups
""")).fetchall()
existing_groups = [(r[0], r[1], json.loads(r[2]) if isinstance(r[2], str) else r[2]) for r in rows]
logger.info("Loaded %d existing object_groups", len(existing_groups))
existing_members: set[str] = set()
for _, _, members in existing_groups:
for m in members:
existing_members.add(m)
logger.info("Existing union of members: %d distinct strings", len(existing_members))
# 2. Find unmatched objects from atomics since `since`
from services.control_dedup import normalize_object
with engine.connect() as c:
rows = c.execute(text("""
SELECT DISTINCT split_part(generation_metadata->>'merge_group_hint', ':', 2) AS obj
FROM canonical_controls
WHERE decomposition_method = 'pass0b'
AND created_at >= :since
AND generation_metadata->>'merge_group_hint' IS NOT NULL
AND generation_metadata->>'merge_group_hint' != ''
AND release_state NOT IN ('deprecated', 'rejected', 'duplicate')
"""), {"since": since_dt}).fetchall()
new_objects_raw = [r[0] for r in rows if r[0]]
logger.info("Distinct objects in new atomics: %d", len(new_objects_raw))
# Normalize each + dedupe; track originals → normalized
normed_to_originals: dict[str, set[str]] = {}
for obj in new_objects_raw:
normed = normalize_object(obj)
if not normed:
continue
if normed in existing_members or obj in existing_members:
continue # already in some group
normed_to_originals.setdefault(normed, set()).update([normed, obj])
unmatched_normed = list(normed_to_originals.keys())
logger.info("Unmatched normalized objects: %d", len(unmatched_normed))
if not unmatched_normed:
logger.info("Nothing to do — all objects already mapped.")
return
# 3. Embed existing canonical_names + unmatched objects
logger.info("Embedding %d existing canonical_names...", len(existing_groups))
existing_emb = embed_many([g[1] for g in existing_groups], label="existing")
logger.info("Embedding %d unmatched objects...", len(unmatched_normed))
unmatched_emb = embed_many(unmatched_normed, label="unmatched")
# 4. Nearest-neighbor: for each unmatched, find best existing match
# cosine = dot product (both already L2-normalized)
logger.info("Computing nearest-neighbor matches...")
sims = unmatched_emb @ existing_emb.T # (N_unmatched, N_existing)
best_idx = sims.argmax(axis=1)
best_score = sims.max(axis=1)
appends: dict[int, list[str]] = {} # group_id → list of new members
new_groups: list[tuple[str, list[str]]] = [] # (canonical_name, members)
for i, normed in enumerate(unmatched_normed):
originals = sorted(normed_to_originals[normed])
if best_score[i] >= args.threshold:
gid = existing_groups[int(best_idx[i])][0]
appends.setdefault(gid, []).extend(originals)
else:
# Create a new group with this object as canonical
new_groups.append((normed, originals))
# Stats
distinct_groups_to_extend = len(appends)
total_appends = sum(len(v) for v in appends.values())
logger.info("Plan: extend %d existing groups (+%d members), create %d new groups",
distinct_groups_to_extend, total_appends, len(new_groups))
if args.dry_run:
logger.info("DRY RUN — no writes")
# Sample
if appends:
sample = list(appends.items())[:5]
for gid, members in sample:
gname = next((g[1] for g in existing_groups if g[0] == gid), "?")
logger.info(" Extend group_id=%d (%s) with: %s", gid, gname, members[:3])
if new_groups:
for name, members in new_groups[:5]:
logger.info(" NEW group: %s — members=%s", name, members[:3])
return
# 5. Write — pure INSERT/UPDATE
with engine.begin() as c:
c.execute(text("SET search_path TO compliance, public"))
# UPDATE existing groups (append to members JSONB)
for gid, new_members in appends.items():
c.execute(text("""
UPDATE object_groups
SET members = (
SELECT jsonb_agg(DISTINCT m)
FROM jsonb_array_elements_text(members || CAST(:new_members AS jsonb)) AS x(m)
),
member_count = (
SELECT count(DISTINCT m)
FROM jsonb_array_elements_text(members || CAST(:new_members AS jsonb)) AS x(m)
)
WHERE group_id = :gid
"""), {"gid": gid, "new_members": json.dumps(new_members)})
# INSERT new groups with next free group_id
next_gid_row = c.execute(text("SELECT COALESCE(MAX(group_id), 0) + 1 FROM object_groups")).fetchone()
next_gid = next_gid_row[0] if next_gid_row else 1
for name, members in new_groups:
c.execute(text("""
INSERT INTO object_groups (group_id, canonical_name, member_count, members, top_controls_count)
VALUES (:gid, :name, :count, CAST(:members AS jsonb), 0)
"""), {
"gid": next_gid,
"name": name[:200],
"count": len(members),
"members": json.dumps(members),
})
next_gid += 1
logger.info("DONE — extended %d existing groups (+%d members), created %d new groups",
distinct_groups_to_extend, total_appends, len(new_groups))
if __name__ == "__main__":
main()
@@ -0,0 +1,267 @@
#!/usr/bin/env python3
"""
G-pre2 INCREMENTAL: Add new atomic controls to Master Controls without rebuild.
Unlike gpre2_master_controls.py which DELETEs and rebuilds the entire
master_controls table, this script is non-destructive:
- Existing master_controls stay untouched (same UUIDs, same MC-IDs)
- For each object_group that gained new atomic controls:
* If MC exists: append new members + update total_controls/phase_counts
* If MC missing AND group now has >= min_phases: create new MC + all members
Usage:
python3 /app/scripts/gpre2_master_controls_incremental.py --since 2026-05-18T02:53:00+00:00
python3 /app/scripts/gpre2_master_controls_incremental.py --since 2026-05-18T02:53:00+00:00 --dry-run
python3 /app/scripts/gpre2_master_controls_incremental.py --since 2026-05-18T02:53:00+00:00 --min-phases 2
"""
import argparse
import json
import logging
import os
from collections import defaultdict
from datetime import datetime
from sqlalchemy import create_engine, text
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("gpre2_incremental")
DB_URL = os.getenv("DATABASE_URL", "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--since", required=True, help="ISO datetime — only consider atomics created at/after this")
parser.add_argument("--min-phases", type=int, default=2, help="Min distinct phases to form a new MC (default 2)")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
since_dt = datetime.fromisoformat(args.since.replace("Z", "+00:00"))
logger.info("Incremental run since %s, min_phases=%d, dry_run=%s",
since_dt.isoformat(), args.min_phases, args.dry_run)
engine = create_engine(DB_URL, connect_args={"options": "-c search_path=compliance,public"})
# Step 1: object → group_id reverse index
object_to_group = {}
with engine.connect() as c:
groups = c.execute(text("SELECT group_id, canonical_name, members FROM object_groups")).fetchall()
for gid, canonical, members_json in groups:
members = json.loads(members_json) if isinstance(members_json, str) else members_json
for member in members:
object_to_group[member] = (gid, canonical)
logger.info("Reverse index: %d objects → %d groups", len(object_to_group), len(groups))
# Step 2: Load ALL atomics with merge_group_hint (we need full picture)
with engine.connect() as c:
all_rows = c.execute(text("""
SELECT id, control_id,
generation_metadata->>'merge_group_hint' AS hint,
title,
created_at
FROM canonical_controls
WHERE generation_metadata->>'merge_group_hint' IS NOT NULL
AND generation_metadata->>'merge_group_hint' != ''
AND release_state NOT IN ('deprecated', 'rejected', 'duplicate')
""")).fetchall()
logger.info("Loaded %d atomic controls total", len(all_rows))
# Step 3: Build group_phases (gid → phase → [(uuid, control_id, action, title, is_new)])
from services.control_dedup import normalize_object
group_phases: dict[int, dict[str, list]] = defaultdict(lambda: defaultdict(list))
group_names: dict[int, str] = {}
new_atomic_count = 0
new_groups_touched: set[int] = set()
unmatched = 0
for uuid, control_id, hint, title, created_at in all_rows:
parts = hint.split(":", 2)
if len(parts) < 2:
continue
action = parts[0]
obj = parts[1]
phase = parts[2] if len(parts) > 2 else "implementation"
normed = normalize_object(obj)
if normed in object_to_group:
gid, canonical = object_to_group[normed]
elif obj in object_to_group:
gid, canonical = object_to_group[obj]
else:
unmatched += 1
continue
is_new = created_at >= since_dt
group_phases[gid][phase].append((str(uuid), control_id, action, title, is_new))
group_names[gid] = canonical
if is_new:
new_atomic_count += 1
new_groups_touched.add(gid)
logger.info("Total: %d new atomics across %d object_groups (%d unmatched)",
new_atomic_count, len(new_groups_touched), unmatched)
if not new_groups_touched:
logger.info("Nothing to do — no new atomics matched to any object_group.")
return
# Step 4: For each touched object_group, decide action
stats = {
"groups_examined": len(new_groups_touched),
"mcs_existing_updated": 0,
"mcs_new_created": 0,
"members_inserted": 0,
"members_skipped_existing": 0,
"groups_skipped_below_min_phases": 0,
"groups_skipped_no_member_change": 0,
}
# Load existing master_controls index: master_control_id → uuid
with engine.connect() as c:
mc_index = {row[1]: (str(row[0]), row[2]) for row in c.execute(text(
"SELECT id, master_control_id, total_controls FROM master_controls"
)).fetchall()}
logger.info("Existing master_controls: %d", len(mc_index))
# Load existing members for touched MCs (avoid duplicate inserts)
touched_mc_ids = ["MC-%d" % gid for gid in new_groups_touched]
existing_members: dict[str, set[str]] = defaultdict(set)
with engine.connect() as c:
for mc_id_str in touched_mc_ids:
mc_uuid_info = mc_index.get(mc_id_str)
if not mc_uuid_info:
continue
mc_uuid = mc_uuid_info[0]
for row in c.execute(text(
"SELECT control_uuid FROM master_control_members WHERE master_control_uuid = CAST(:u AS uuid)"
), {"u": mc_uuid}).fetchall():
existing_members[mc_id_str].add(str(row[0]))
# Build INSERT/UPDATE plans
inserts_new_mcs = []
inserts_members = []
updates_mcs = []
PHASE_ORDER = {
"scope": 0, "definition": 1, "governance": 1, "design": 2,
"implementation": 3, "configuration": 3, "operation": 4, "training": 4,
"monitoring": 5, "testing": 6, "review": 7, "assessment": 8,
"remediation": 8, "validation": 9, "reporting": 10, "evidence": 11,
}
for gid in new_groups_touched:
mc_id_str = "MC-%d" % gid
phases = group_phases[gid]
canonical = group_names[gid]
all_phases = sorted(phases.keys(), key=lambda p: PHASE_ORDER.get(p, 99))
phase_counts = {p: len(ctrls) for p, ctrls in phases.items()}
total = sum(phase_counts.values())
existing_mc = mc_index.get(mc_id_str)
if existing_mc:
# MC exists — append only NEW atomics that aren't already members
mc_uuid = existing_mc[0]
existing_set = existing_members[mc_id_str]
added_for_this_mc = 0
for phase, controls in phases.items():
for ctrl_uuid, ctrl_id, action, title, is_new in controls:
if ctrl_uuid in existing_set:
stats["members_skipped_existing"] += 1
continue
inserts_members.append({
"mc_uuid": mc_uuid, "control_uuid": ctrl_uuid,
"phase": phase, "action": action,
})
stats["members_inserted"] += 1
added_for_this_mc += 1
if added_for_this_mc > 0:
updates_mcs.append({
"mc_uuid": mc_uuid,
"phases_covered": json.dumps(all_phases),
"phase_control_count": json.dumps(phase_counts),
"total_controls": total,
})
stats["mcs_existing_updated"] += 1
else:
stats["groups_skipped_no_member_change"] += 1
else:
# MC missing — create only if group now meets min_phases threshold
if len(phases) < args.min_phases:
stats["groups_skipped_below_min_phases"] += 1
continue
inserts_new_mcs.append({
"master_control_id": mc_id_str,
"object_group_id": gid,
"canonical_name": canonical,
"phases_covered": json.dumps(all_phases),
"phase_control_count": json.dumps(phase_counts),
"total_controls": total,
"_members": [
{"control_uuid": c[0], "phase": p, "action": c[2]}
for p, ctrls in phases.items() for c in ctrls
],
})
stats["mcs_new_created"] += 1
logger.info("Plan summary: %s", stats)
if args.dry_run:
logger.info("DRY RUN — no writes")
# Show first few examples
if inserts_new_mcs:
logger.info("Sample NEW MCs (up to 5):")
for mc in inserts_new_mcs[:5]:
logger.info(" %s: %s — total=%d, phases=%s",
mc["master_control_id"], mc["canonical_name"],
mc["total_controls"], mc["phases_covered"])
if updates_mcs:
logger.info("Updates to existing MCs: %d", len(updates_mcs))
return
# Step 5: WRITE — strictly INSERT/UPDATE, no DELETE
with engine.begin() as c:
c.execute(text("SET search_path TO compliance, public"))
# 5a: Insert new MCs + their members
for mc in inserts_new_mcs:
new_uuid_row = c.execute(text("""
INSERT INTO master_controls
(master_control_id, object_group_id, canonical_name,
phases_covered, phase_control_count, total_controls)
VALUES (:master_control_id, :object_group_id, :canonical_name,
CAST(:phases_covered AS jsonb), CAST(:phase_control_count AS jsonb),
:total_controls)
RETURNING id
"""), {k: v for k, v in mc.items() if k != "_members"}).fetchone()
new_mc_uuid = str(new_uuid_row[0])
for mem in mc["_members"]:
c.execute(text("""
INSERT INTO master_control_members
(master_control_uuid, control_uuid, phase, action)
VALUES (CAST(:mc_uuid AS uuid), CAST(:control_uuid AS uuid), :phase, :action)
"""), {"mc_uuid": new_mc_uuid, **mem})
# 5b: Append new members to existing MCs
for mem in inserts_members:
c.execute(text("""
INSERT INTO master_control_members
(master_control_uuid, control_uuid, phase, action)
VALUES (CAST(:mc_uuid AS uuid), CAST(:control_uuid AS uuid), :phase, :action)
"""), mem)
# 5c: Update phase counts / totals on touched existing MCs
for upd in updates_mcs:
c.execute(text("""
UPDATE master_controls
SET phases_covered = CAST(:phases_covered AS jsonb),
phase_control_count = CAST(:phase_control_count AS jsonb),
total_controls = :total_controls
WHERE id = CAST(:mc_uuid AS uuid)
"""), upd)
logger.info("DONE — wrote %d new MCs, updated %d existing MCs, %d members inserted",
stats["mcs_new_created"], stats["mcs_existing_updated"], stats["members_inserted"])
if __name__ == "__main__":
main()
@@ -0,0 +1,414 @@
#!/usr/bin/env python3
"""Ingest CRA-relevant ENISA documents into the RAG (collection `bp_compliance_ce`).
Source files live under `legal-sources/enisa/` in this repo. The script extracts
PDF text with pdfplumber (HTML for the SRP FAQ), normalizes it, and uploads via
the RAG service with `chunk_strategy='legal'` so that section metadata is
attached to every chunk.
Each document carries a `requirement_strength` field so downstream consumers
can distinguish normative material from guidance and consultation drafts:
- mandatory binding (none in this batch; CRA itself is the law)
- guidance official ENISA / EUCC guidance, citable
- consultation_draft public-consultation drafts (use with caveat)
Usage (run on Mac Mini after copying the legal-sources/enisa/ folder, or via SSH
with the repo mounted):
python3 control-pipeline/scripts/ingest_enisa_cra.py --dry-run
python3 control-pipeline/scripts/ingest_enisa_cra.py
"""
import argparse
import json
import re
import sys
import time
import unicodedata
from html.parser import HTMLParser
from pathlib import Path
import httpx
import pdfplumber
RAG_URL = "https://localhost:8097"
QDRANT_URL = "http://localhost:6333"
UPLOAD_TIMEOUT = 1800.0
COLLECTION = "bp_compliance_ce"
REPO_ROOT = Path(__file__).resolve().parents[2]
SOURCE_DIR = REPO_ROOT / "legal-sources" / "enisa"
DOCS = [
{
"regulation_id": "enisa_cra_requirements_standards_mapping",
"filename": "enisa_cra_requirements_standards_mapping.pdf",
"upload_filename": "enisa_cra_requirements_standards_mapping.txt",
"extra_metadata": {
"regulation_id": "enisa_cra_requirements_standards_mapping",
"regulation_short": "ENISA CRA Standards Mapping",
"guideline_name": "Cyber Resilience Act Requirements Standards Mapping",
"doc_type": "standards_mapping",
"requirement_strength": "guidance",
"publication_year": "2024",
"license": "reuse_with_attribution",
"source": "enisa.europa.eu",
"attribution": "ENISA, CC BY 4.0",
},
},
{
"regulation_id": "enisa_cra_implementation_via_eucc",
"filename": "enisa_cra_implementation_via_eucc.pdf",
"upload_filename": "enisa_cra_implementation_via_eucc.txt",
"extra_metadata": {
"regulation_id": "enisa_cra_implementation_via_eucc",
"regulation_short": "ENISA CRA via EUCC",
"guideline_name": "CRA Implementation via EUCC and its Applicable Technical Elements",
"doc_type": "certification_guidance",
"requirement_strength": "guidance",
"license": "reuse_with_attribution",
"source": "enisa.europa.eu",
"attribution": "ENISA, CC BY 4.0",
},
},
{
"regulation_id": "enisa_cra_implementation_via_eucc_annex",
"filename": "enisa_cra_implementation_via_eucc_annex.pdf",
"upload_filename": "enisa_cra_implementation_via_eucc_annex.txt",
"extra_metadata": {
"regulation_id": "enisa_cra_implementation_via_eucc_annex",
"regulation_short": "ENISA CRA via EUCC (Annex)",
"guideline_name": "Annex — CRA Implementation via EUCC",
"doc_type": "certification_guidance_annex",
"requirement_strength": "guidance",
"license": "reuse_with_attribution",
"source": "enisa.europa.eu",
"attribution": "ENISA, CC BY 4.0",
},
},
{
"regulation_id": "enisa_eucc_vulnerability_management_disclosure",
"filename": "enisa_eucc_vulnerability_management_disclosure.pdf",
"upload_filename": "enisa_eucc_vulnerability_management_disclosure.txt",
"extra_metadata": {
"regulation_id": "enisa_eucc_vulnerability_management_disclosure",
"regulation_short": "EUCC Vuln Management & Disclosure",
"guideline_name": "EUCC Guidelines — Vulnerability Management and Disclosure v1.1",
"doc_type": "vulnerability_guidance",
"requirement_strength": "guidance",
"license": "reuse_with_attribution",
"source": "enisa.europa.eu",
"attribution": "ENISA, CC BY 4.0",
},
},
{
"regulation_id": "enisa_eccg_opinion_vulnerability_management",
"filename": "enisa_eccg_opinion_vulnerability_management.pdf",
"upload_filename": "enisa_eccg_opinion_vulnerability_management.txt",
"extra_metadata": {
"regulation_id": "enisa_eccg_opinion_vulnerability_management",
"regulation_short": "ECCG Opinion Vuln Management",
"guideline_name": "Final ECCG Opinion — Guidance on Vulnerability Management",
"doc_type": "eccg_opinion",
"requirement_strength": "guidance",
"license": "reuse_with_attribution",
"source": "enisa.europa.eu",
"attribution": "ENISA, CC BY 4.0",
},
},
{
"regulation_id": "enisa_nis2_technical_implementation_guidance",
"filename": "enisa_nis2_technical_implementation_guidance.pdf",
"upload_filename": "enisa_nis2_technical_implementation_guidance.txt",
"extra_metadata": {
"regulation_id": "enisa_nis2_technical_implementation_guidance",
"regulation_short": "ENISA NIS2 TIG v1.0",
"guideline_name": "ENISA Technical Implementation Guidance on Cybersecurity Risk Management Measures v1.0",
"doc_type": "technical_guidance",
"requirement_strength": "guidance",
"publication_year": "2025",
"license": "reuse_with_attribution",
"source": "enisa.europa.eu",
"attribution": "ENISA, CC BY 4.0",
},
},
{
"regulation_id": "enisa_nis2_security_measures_consultation",
"filename": "enisa_nis2_security_measures_implementation_guidance_consultation.pdf",
"upload_filename": "enisa_nis2_security_measures_consultation.txt",
"extra_metadata": {
"regulation_id": "enisa_nis2_security_measures_consultation",
"regulation_short": "ENISA NIS2 Security Measures (Draft)",
"guideline_name": "Implementation Guidance on Security Measures — Public Consultation Draft",
"doc_type": "consultation_draft",
"requirement_strength": "consultation_draft",
"license": "reuse_with_attribution",
"source": "enisa.europa.eu",
"attribution": "ENISA, CC BY 4.0",
},
},
{
"regulation_id": "enisa_cra_single_reporting_platform_faq",
"filename": "enisa_cra_single_reporting_platform_faq.html",
"upload_filename": "enisa_cra_single_reporting_platform_faq.txt",
"extra_metadata": {
"regulation_id": "enisa_cra_single_reporting_platform_faq",
"regulation_short": "ENISA SRP FAQ",
"guideline_name": "CRA Single Reporting Platform (SRP) FAQ",
"doc_type": "faq",
"requirement_strength": "guidance",
"license": "reuse_with_attribution",
"source": "enisa.europa.eu",
"attribution": "ENISA, CC BY 4.0",
},
},
{
"regulation_id": "enisa_eucc_evaluation_methodology_product_series",
"filename": "enisa_eucc_evaluation_methodology_product_series.pdf",
"upload_filename": "enisa_eucc_evaluation_methodology_product_series.txt",
"extra_metadata": {
"regulation_id": "enisa_eucc_evaluation_methodology_product_series",
"regulation_short": "EUCC Eval Methodology Product Series",
"guideline_name": "EUCC Guidelines — Evaluation Methodology for Product Series v1.0",
"doc_type": "evaluation_methodology",
"requirement_strength": "guidance",
"publication_year": "2025",
"license": "reuse_with_attribution",
"source": "enisa.europa.eu",
"attribution": "ENISA, CC BY 4.0",
},
},
{
"regulation_id": "enisa_threat_landscape_2025",
"filename": "enisa_threat_landscape_2025.pdf",
"upload_filename": "enisa_threat_landscape_2025.txt",
"extra_metadata": {
"regulation_id": "enisa_threat_landscape_2025",
"regulation_short": "ENISA Threat Landscape 2025",
"guideline_name": "ENISA Threat Landscape 2025 v1.2",
"doc_type": "threat_landscape",
"requirement_strength": "evidentiary",
"publication_year": "2025",
"license": "reuse_with_attribution",
"source": "enisa.europa.eu",
"attribution": "ENISA, CC BY 4.0",
},
},
{
"regulation_id": "enisa_cvd_policies_eu_2022",
"filename": "enisa_cvd_policies_eu_2022.pdf",
"upload_filename": "enisa_cvd_policies_eu_2022.txt",
"extra_metadata": {
"regulation_id": "enisa_cvd_policies_eu_2022",
"regulation_short": "ENISA CVD Policies EU 2022",
"guideline_name": "Coordinated Vulnerability Disclosure Policies in the EU (2022)",
"doc_type": "policy_study",
"requirement_strength": "guidance",
"publication_year": "2022",
"license": "reuse_with_attribution",
"source": "enisa.europa.eu",
"attribution": "ENISA, CC BY 4.0",
},
},
]
def normalize_text(text: str) -> str:
text = unicodedata.normalize("NFKC", text)
text = text.replace("­", "").replace("", "")
prev = None
while prev != text:
prev = text
text = re.sub(r"(\d+)\s+\.\s+(\d+)", r"\1.\2", text)
text = re.sub(r"\b([A-Z]{2,4})\s+-\s+(\d+)\b", r"\1-\2", text)
text = re.sub(r"\(\s+(\d+)\s+\)", r"(\1)", text)
text = re.sub(r"[^\S\n]{2,}", " ", text)
return text
class _HTMLToText(HTMLParser):
SKIP = {"script", "style", "nav", "header", "footer", "noscript"}
BLOCK = {"p", "div", "li", "br", "h1", "h2", "h3", "h4", "h5", "h6", "tr", "section"}
def __init__(self) -> None:
super().__init__()
self._buf: list[str] = []
self._skip_depth = 0
def handle_starttag(self, tag, attrs):
if tag in self.SKIP:
self._skip_depth += 1
if tag in self.BLOCK:
self._buf.append("\n")
def handle_endtag(self, tag):
if tag in self.SKIP and self._skip_depth > 0:
self._skip_depth -= 1
if tag in self.BLOCK:
self._buf.append("\n")
def handle_data(self, data):
if self._skip_depth == 0:
self._buf.append(data)
def text(self) -> str:
raw = "".join(self._buf)
raw = re.sub(r"\n{3,}", "\n\n", raw)
return raw.strip()
def extract_pdf(path: Path) -> str:
print(f" Extracting PDF: {path.name}")
parts: list[str] = []
with pdfplumber.open(path) as pdf:
for i, page in enumerate(pdf.pages):
t = page.extract_text(x_tolerance=3, y_tolerance=4)
if t:
parts.append(t)
if (i + 1) % 50 == 0:
print(f" {i + 1}/{len(pdf.pages)} pages...")
return normalize_text("\n\n".join(parts))
def extract_html(path: Path) -> str:
print(f" Extracting HTML: {path.name}")
html = path.read_text(encoding="utf-8", errors="replace")
parser = _HTMLToText()
parser.feed(html)
return normalize_text(parser.text())
def get_text(doc) -> str:
path = SOURCE_DIR / doc["filename"]
if not path.exists():
raise FileNotFoundError(path)
if path.suffix.lower() == ".pdf":
text = extract_pdf(path)
elif path.suffix.lower() in {".html", ".htm"}:
text = extract_html(path)
else:
raise ValueError(f"Unsupported file type: {path.suffix}")
print(f" Extracted {len(text):,} chars")
return text
def upload_text_legal(text: str, filename: str, extra_metadata: dict) -> dict:
form_data = {
"collection": COLLECTION,
"data_type": "compliance",
"bundesland": "bund",
"use_case": "compliance",
"year": "2026",
"chunk_strategy": "legal",
"chunk_size": "1500",
"chunk_overlap": "100",
"metadata_json": json.dumps(extra_metadata, ensure_ascii=False),
}
with httpx.Client(timeout=UPLOAD_TIMEOUT, verify=False) as c:
resp = c.post(
f"{RAG_URL}/api/v1/documents/upload",
files={"file": (filename, text.encode("utf-8"), "text/plain")},
data=form_data,
)
resp.raise_for_status()
return resp.json()
def count_chunks(regulation_id: str) -> int:
with httpx.Client(timeout=30) as c:
resp = c.post(
f"{QDRANT_URL}/collections/{COLLECTION}/points/count",
json={
"filter": {
"must": [
{"key": "regulation_id", "match": {"value": regulation_id}}
]
},
"exact": True,
},
)
resp.raise_for_status()
return resp.json()["result"]["count"]
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true",
help="Extract text and report sizes, but do not upload.")
parser.add_argument("--only", action="append", default=[],
help="Limit run to one or more regulation_ids.")
args = parser.parse_args()
if not SOURCE_DIR.exists():
print(f"ERROR: source dir not found: {SOURCE_DIR}")
return 2
docs = DOCS
if args.only:
wanted = set(args.only)
docs = [d for d in DOCS if d["regulation_id"] in wanted]
missing = wanted - {d["regulation_id"] for d in docs}
if missing:
print(f"ERROR: unknown regulation_id(s): {sorted(missing)}")
return 2
print("=" * 70)
print(f"ENISA CRA ingestion → collection={COLLECTION}")
print(f"Source dir: {SOURCE_DIR}")
print(f"Documents: {len(docs)} Dry run: {args.dry_run}")
print("=" * 70)
results = []
for i, doc in enumerate(docs, 1):
reg_id = doc["regulation_id"]
print(f"\n[{i}/{len(docs)}] {reg_id}")
existing = count_chunks(reg_id) if not args.dry_run else "?"
print(f" Existing chunks in Qdrant: {existing}")
try:
text = get_text(doc)
except Exception as e:
print(f" ERROR extracting text: {e}")
results.append({"id": reg_id, "chars": 0, "new": 0,
"strength": doc["extra_metadata"]["requirement_strength"]})
continue
if args.dry_run:
results.append({"id": reg_id, "chars": len(text), "new": "?",
"strength": doc["extra_metadata"]["requirement_strength"]})
continue
if existing and existing > 0:
print(f" SKIP — {existing} chunks already present. "
f"Use Qdrant delete-by-filter before re-ingesting.")
results.append({"id": reg_id, "chars": len(text), "new": 0,
"strength": doc["extra_metadata"]["requirement_strength"]})
continue
print(" Uploading with chunk_strategy='legal'...")
result = upload_text_legal(
text, doc["upload_filename"], doc["extra_metadata"]
)
new_chunks = result.get("chunks_count", 0)
new_doc_id = result.get("document_id", "")
print(f" -> {new_chunks} chunks (doc_id={new_doc_id})")
results.append({"id": reg_id, "chars": len(text), "new": new_chunks,
"strength": doc["extra_metadata"]["requirement_strength"]})
if i < len(docs):
time.sleep(2)
print("\n" + "=" * 70)
print("SUMMARY")
print("=" * 70)
for r in results:
print(f" {r['id']:<55} chars={r['chars']:<9} new={r['new']:<5} "
f"strength={r['strength']}")
total_new = sum(r["new"] for r in results if isinstance(r["new"], int))
print(f"\nTotal new chunks: {total_new}")
return 0
if __name__ == "__main__":
sys.exit(main())
+40 -14
View File
@@ -22,6 +22,7 @@ import json
import logging
import time
from collections import defaultdict
from datetime import datetime
from sqlalchemy import text
@@ -108,24 +109,37 @@ class BatchDedupRunner:
self._progress_phase = ""
self._progress_count = 0
self._progress_total = 0
self._since = None # set by run() when scoped run requested
async def run(
self,
dry_run: bool = False,
hint_filter: str = None,
since: datetime = None,
) -> dict:
"""Run the full batch dedup pipeline.
Args:
dry_run: If True, compute stats but don't modify DB/Qdrant.
hint_filter: If set, only process groups matching this hint prefix.
since: If set, only process controls with created_at >= since.
Useful for incremental dedup after single-document ingestion.
Returns:
Stats dict with counts.
"""
start = time.monotonic()
logger.info("BatchDedup starting (dry_run=%s, hint_filter=%s)",
dry_run, hint_filter)
logger.info("BatchDedup starting (dry_run=%s, hint_filter=%s, since=%s)",
dry_run, hint_filter, since)
# Scoped runs reset checkpoint to avoid skipping new controls whose
# control_id sorts before the stale last_id of a previous full run.
self._since = since
if since and not dry_run:
self.db.execute(text(
"DELETE FROM canonical_generation_jobs WHERE status = 'dedup_phase2_checkpoint'"
))
self.db.commit()
if not dry_run:
await ensure_qdrant_collection(collection=self.collection)
@@ -133,7 +147,7 @@ class BatchDedupRunner:
# Phase 1: Intra-group dedup (same merge_group_hint)
# Optimization: skip singleton groups (they're automatically masters)
self._progress_phase = "phase1"
groups = self._load_merge_groups(hint_filter)
groups = self._load_merge_groups(hint_filter, since)
self._progress_total = self.stats["total_controls"]
multi_groups = [(h, c) for h, c in groups if len(c) > 1]
@@ -171,7 +185,7 @@ class BatchDedupRunner:
logger.info("BatchDedup completed in %.1fs: %s", elapsed, self.stats)
return self.stats
def _load_merge_groups(self, hint_filter: str = None) -> list:
def _load_merge_groups(self, hint_filter: str = None, since: datetime = None) -> list:
"""Load all Pass 0b controls grouped by merge_group_hint, largest first."""
conditions = [
"decomposition_method = 'pass0b'",
@@ -184,6 +198,10 @@ class BatchDedupRunner:
conditions.append("generation_metadata->>'merge_group_hint' LIKE :hf")
params["hf"] = f"{hint_filter}%"
if since:
conditions.append("created_at >= :since")
params["since"] = since
where = " AND ".join(conditions)
rows = self.db.execute(text(f"""
SELECT id::text, control_id, title, objective,
@@ -335,13 +353,15 @@ class BatchDedupRunner:
"""
logger.info("BatchDedup Phase 2: Cross-group pass starting...")
# Count total
total_row = self.db.execute(text("""
# Count total — respect scoped run if since is set
since_clause = " AND created_at >= :since" if self._since else ""
params = {"since": self._since} if self._since else {}
total_row = self.db.execute(text(f"""
SELECT COUNT(*) FROM canonical_controls
WHERE decomposition_method = 'pass0b'
AND release_state != 'duplicate'
AND release_state != 'deprecated'
""")).fetchone()
AND release_state != 'deprecated'{since_clause}
"""), params).fetchone()
total = total_row[0] if total_row else 0
self._progress_total = total
@@ -360,13 +380,16 @@ class BatchDedupRunner:
last_control_id = checkpoint_row[0] if checkpoint_row else ""
if last_control_id:
skip_row = self.db.execute(text("""
skip_params = {"last_id": last_control_id}
if self._since:
skip_params["since"] = self._since
skip_row = self.db.execute(text(f"""
SELECT COUNT(*) FROM canonical_controls
WHERE decomposition_method = 'pass0b'
AND release_state != 'duplicate'
AND release_state != 'deprecated'
AND control_id <= :last_id
"""), {"last_id": last_control_id}).fetchone()
AND control_id <= :last_id{since_clause}
"""), skip_params).fetchone()
skipped = skip_row[0] if skip_row else 0
self._progress_count = skipped
logger.info("BatchDedup Cross-group: RESUMING from %s (skipping %d already processed)",
@@ -382,17 +405,20 @@ class BatchDedupRunner:
total, last_control_id or "beginning")
while True:
rows = self.db.execute(text("""
page_params = {"last_id": last_control_id, "page_size": DB_PAGE}
if self._since:
page_params["since"] = self._since
rows = self.db.execute(text(f"""
SELECT id::text, control_id, title,
generation_metadata->>'merge_group_hint' as merge_group_hint
FROM canonical_controls
WHERE decomposition_method = 'pass0b'
AND release_state != 'duplicate'
AND release_state != 'deprecated'
AND control_id > :last_id
AND control_id > :last_id{since_clause}
ORDER BY control_id
LIMIT :page_size
"""), {"last_id": last_control_id, "page_size": DB_PAGE}).fetchall()
"""), page_params).fetchall()
if not rows:
break