Files
breakpilot-core/control-pipeline/scripts/d6_citation_backfill.py
T
Benjamin Admin 118be3540d feat(pipeline): D6 citation backfill + E2/E3 law ingestion scripts
- d6_citation_backfill.py: 3-tier matching (hash/prefix/overlap),
  archives old citations, updated 3.651 controls (93.6% coverage)
- ingest_de_laws.py: 8 German laws ingested (ArbZG, MuSchG, NachwG,
  MiLoG, GmbHG, AktG, InsO, BUrlG — 1.629 chunks)
- ingest_eu_regulations.py: EUR-Lex ingestion (needs manual HTML due
  to AWS WAF). CSRD, CSDDD, EU Taxonomy, eIDAS 2.0, Pay Transparency
  manually ingested (1.057 chunks)
- Updated session handover with current state

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-03 13:19:27 +02:00

499 lines
16 KiB
Python

#!/usr/bin/env python3
"""D6 Citation Backfill — update ~291k controls with section metadata from Qdrant chunks.
Archives old source_citation in generation_metadata.old_citation.
Updates source_citation.article, .paragraph, .page from matched Qdrant chunks.
3-tier matching:
Tier 1: sha256(source_original_text) → exact chunk text match
Tier 2: Parse [section] prefix from source_original_text
Tier 3: Best text overlap within same regulation_id
Usage:
python3 control-pipeline/scripts/d6_citation_backfill.py --dry-run --limit 100
python3 control-pipeline/scripts/d6_citation_backfill.py --batch-size 1000
"""
import argparse
import hashlib
import json
import logging
import os
import re
import time
from dataclasses import dataclass
from typing import Optional
import httpx
import psycopg2
import psycopg2.extras
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger("d6-backfill")
DB_URL = os.getenv("DATABASE_URL", "postgresql://breakpilot:breakpilot@localhost:5432/breakpilot_db")
QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333")
COLLECTIONS = [
"bp_compliance_ce",
"bp_compliance_gesetze",
"bp_compliance_datenschutz",
"bp_dsfa_corpus",
"bp_legal_templates",
]
# Parse [§ 312k Title] or [AC-1 POLICY] prefix from chunk text
_SECTION_PREFIX_RE = re.compile(r'^\[([^\]]+)\]\s*')
@dataclass
class ChunkMeta:
section: str
section_title: str
paragraph: str
page: Optional[int]
regulation_id: str
@dataclass
class Stats:
total: int = 0
already_correct: int = 0
matched_hash: int = 0
matched_prefix: int = 0
matched_overlap: int = 0
unmatched: int = 0
updated: int = 0
errors: int = 0
# -------------------------------------------------------------------
# Phase 1: Build Qdrant index
# -------------------------------------------------------------------
def build_qdrant_index(qdrant_url: str) -> tuple[dict, dict]:
"""Build hash index and regulation index from all Qdrant collections.
Returns:
hash_index: {sha256(chunk_text) → ChunkMeta}
reg_index: {regulation_id → [ChunkMeta with text snippets]}
"""
hash_index: dict[str, ChunkMeta] = {}
reg_index: dict[str, list[tuple[str, ChunkMeta]]] = {}
total_chunks = 0
for coll in COLLECTIONS:
offset = None
coll_count = 0
with httpx.Client(timeout=60.0) as c:
while True:
body = {
"limit": 250,
"with_payload": [
"chunk_text", "section", "section_title",
"paragraph", "page", "regulation_id",
],
"with_vector": False,
}
if offset is not None:
body["offset"] = offset
resp = c.post(
f"{qdrant_url}/collections/{coll}/points/scroll",
json=body,
)
resp.raise_for_status()
data = resp.json()["result"]
for pt in data["points"]:
p = pt.get("payload", {})
chunk_text = p.get("chunk_text", "")
if not chunk_text or len(chunk_text.strip()) < 30:
continue
meta = ChunkMeta(
section=p.get("section", "") or "",
section_title=p.get("section_title", "") or "",
paragraph=p.get("paragraph", "") or "",
page=p.get("page"),
regulation_id=p.get("regulation_id", "") or "",
)
# Hash index
h = hashlib.sha256(chunk_text.encode()).hexdigest()
if meta.section: # only index chunks WITH section data
hash_index[h] = meta
# Regulation index (for text overlap matching)
if meta.regulation_id and meta.section:
reg_index.setdefault(meta.regulation_id, []).append(
(chunk_text[:500], meta)
)
coll_count += 1
offset = data.get("next_page_offset")
if offset is None:
break
total_chunks += coll_count
logger.info(" [%s] %d chunks indexed", coll, coll_count)
logger.info("Qdrant index: %d total chunks, %d with section (hash), %d regulations",
total_chunks, len(hash_index), len(reg_index))
return hash_index, reg_index
# -------------------------------------------------------------------
# Phase 2: Load controls
# -------------------------------------------------------------------
def load_controls(db_url: str, limit: int = 0) -> list[dict]:
"""Load all controls needing citation update."""
conn = psycopg2.connect(db_url)
conn.set_session(autocommit=False)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SET search_path TO compliance, core, public")
query = """
SELECT id, control_id, source_citation, source_original_text,
generation_metadata, license_rule
FROM canonical_controls
WHERE license_rule IN (1, 2)
AND source_citation IS NOT NULL
ORDER BY control_id
"""
if limit > 0:
query += f" LIMIT {limit}"
cur.execute(query)
rows = cur.fetchall()
conn.close()
controls = []
for row in rows:
ctrl = dict(row)
ctrl["id"] = str(ctrl["id"])
for jf in ("source_citation", "generation_metadata"):
val = ctrl.get(jf)
if isinstance(val, str):
try:
ctrl[jf] = json.loads(val)
except (json.JSONDecodeError, TypeError):
ctrl[jf] = {}
elif val is None:
ctrl[jf] = {}
controls.append(ctrl)
return controls
# -------------------------------------------------------------------
# Phase 3: Matching
# -------------------------------------------------------------------
def match_control(
ctrl: dict,
hash_index: dict[str, ChunkMeta],
reg_index: dict[str, list[tuple[str, ChunkMeta]]],
) -> tuple[Optional[ChunkMeta], str]:
"""Match a control to a Qdrant chunk. Returns (meta, method) or (None, '')."""
source_text = ctrl.get("source_original_text", "") or ""
# Tier 1: Hash match
if source_text:
h = hashlib.sha256(source_text.encode()).hexdigest()
meta = hash_index.get(h)
if meta and meta.section:
return meta, "hash"
# Tier 2: Parse [section] prefix from source_original_text
if source_text:
m = _SECTION_PREFIX_RE.match(source_text)
if m:
prefix = m.group(1).strip()
parsed = _parse_section_from_prefix(prefix)
if parsed:
return parsed, "prefix"
# Tier 3: Text overlap within same regulation
gen_meta = ctrl.get("generation_metadata") or {}
reg_id = gen_meta.get("source_regulation", "")
if reg_id and source_text and reg_id in reg_index:
best = _find_best_overlap(source_text, reg_index[reg_id])
if best:
return best, "overlap"
return None, ""
def _parse_section_from_prefix(prefix: str) -> Optional[ChunkMeta]:
"""Parse a section prefix like '§ 312k Kuendigungsbutton' or 'AC-1 POLICY'."""
if not prefix:
return None
# § pattern
m = re.match(r'\s*\d+[a-z]*)\s*(.*)', prefix)
if m:
return ChunkMeta(
section=m.group(1).strip(),
section_title=m.group(2).strip(),
paragraph="", page=None, regulation_id="",
)
# Art./Artikel pattern
m = re.match(r'(Art(?:ikel|\.)\s*\d+)\s*(.*)', prefix, re.IGNORECASE)
if m:
return ChunkMeta(
section=m.group(1).strip(),
section_title=m.group(2).strip(),
paragraph="", page=None, regulation_id="",
)
# NIST control pattern (AC-1, AU-2, etc.)
m = re.match(r'([A-Z]{2,4}-\d+(?:\(\d+\))?)\s*(.*)', prefix)
if m:
return ChunkMeta(
section=m.group(1).strip(),
section_title=m.group(2).strip(),
paragraph="", page=None, regulation_id="",
)
# Numbered section (3.1 Title)
m = re.match(r'(\d+(?:\.\d+)+)\s*(.*)', prefix)
if m:
return ChunkMeta(
section=m.group(1).strip(),
section_title=m.group(2).strip(),
paragraph="", page=None, regulation_id="",
)
# ALL-CAPS heading (fallback — use as section_title)
if prefix == prefix.upper() and len(prefix) > 3:
return ChunkMeta(
section="", section_title=prefix,
paragraph="", page=None, regulation_id="",
)
return None
def _find_best_overlap(source_text: str, chunks: list[tuple[str, ChunkMeta]]) -> Optional[ChunkMeta]:
"""Find chunk with best text overlap (simple word-set Jaccard)."""
source_words = set(source_text.lower().split())
if len(source_words) < 5:
return None
best_score = 0.0
best_meta = None
for chunk_text, meta in chunks:
chunk_words = set(chunk_text.lower().split())
if not chunk_words:
continue
intersection = len(source_words & chunk_words)
union = len(source_words | chunk_words)
jaccard = intersection / union if union > 0 else 0
if jaccard > best_score and jaccard > 0.3: # 30% threshold
best_score = jaccard
best_meta = meta
return best_meta
# -------------------------------------------------------------------
# Phase 4: Update controls
# -------------------------------------------------------------------
def update_controls(
db_url: str,
controls: list[dict],
hash_index: dict[str, ChunkMeta],
reg_index: dict[str, list[tuple[str, ChunkMeta]]],
dry_run: bool = True,
batch_size: int = 1000,
) -> Stats:
"""Match and update all controls."""
stats = Stats(total=len(controls))
conn = psycopg2.connect(db_url)
conn.set_session(autocommit=False)
cur = conn.cursor()
cur.execute("SET search_path TO compliance, core, public")
updates = []
for i, ctrl in enumerate(controls):
if i > 0 and i % 5000 == 0:
logger.info("Progress: %d/%d (hash=%d prefix=%d overlap=%d unmatched=%d)",
i, stats.total, stats.matched_hash, stats.matched_prefix,
stats.matched_overlap, stats.unmatched)
citation = ctrl.get("source_citation") or {}
old_article = citation.get("article", "")
gen_meta = ctrl.get("generation_metadata") or {}
# Match
meta, method = match_control(ctrl, hash_index, reg_index)
if not meta or not meta.section:
# No match — check if existing article is already good
if old_article:
stats.already_correct += 1
else:
stats.unmatched += 1
continue
# Check if update is needed
if old_article == meta.section:
stats.already_correct += 1
continue
# Track method
if method == "hash":
stats.matched_hash += 1
elif method == "prefix":
stats.matched_prefix += 1
elif method == "overlap":
stats.matched_overlap += 1
# Archive old citation
if old_article or citation.get("paragraph"):
gen_meta["old_citation"] = {
"article": old_article,
"paragraph": citation.get("paragraph", ""),
"page": citation.get("page"),
"archived_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
# Update citation
citation["article"] = meta.section
if meta.paragraph:
citation["paragraph"] = meta.paragraph
if meta.page is not None:
citation["page"] = meta.page
# Update generation_metadata
gen_meta["source_article"] = meta.section
if meta.paragraph:
gen_meta["source_paragraph"] = meta.paragraph
if meta.page is not None:
gen_meta["source_page"] = meta.page
gen_meta["backfill_method"] = method
gen_meta["backfill_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
updates.append((
json.dumps(citation, ensure_ascii=False),
json.dumps(gen_meta, ensure_ascii=False, default=str),
ctrl["id"],
))
# Batch commit
if len(updates) >= batch_size and not dry_run:
_execute_batch(cur, updates)
conn.commit()
stats.updated += len(updates)
logger.info("Committed batch: %d updates (total %d)", len(updates), stats.updated)
updates = []
# Final batch
if updates and not dry_run:
_execute_batch(cur, updates)
conn.commit()
stats.updated += len(updates)
logger.info("Committed final batch: %d updates (total %d)", len(updates), stats.updated)
elif updates and dry_run:
stats.updated = len(updates) # would-be updates
conn.close()
return stats
def _execute_batch(cur, updates: list[tuple]):
"""Execute batch UPDATE statements."""
for citation_json, meta_json, ctrl_id in updates:
cur.execute(
"""UPDATE canonical_controls
SET source_citation = %s::jsonb,
generation_metadata = %s::jsonb,
updated_at = NOW()
WHERE id = %s::uuid""",
(citation_json, meta_json, ctrl_id),
)
# -------------------------------------------------------------------
# Main
# -------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="D6 Citation Backfill")
parser.add_argument("--dry-run", action="store_true", help="Don't write to DB")
parser.add_argument("--limit", type=int, default=0, help="Limit controls (0=all)")
parser.add_argument("--batch-size", type=int, default=1000)
parser.add_argument("--db-url", default=DB_URL)
parser.add_argument("--qdrant-url", default=QDRANT_URL)
args = parser.parse_args()
logger.info("=" * 60)
logger.info("D6 Citation Backfill")
logger.info(" DB: %s", args.db_url.split("@")[-1])
logger.info(" Qdrant: %s", args.qdrant_url)
logger.info(" Dry run: %s", args.dry_run)
logger.info(" Limit: %s", args.limit or "ALL")
logger.info("=" * 60)
# Phase 1: Build Qdrant index
logger.info("\nPhase 1: Building Qdrant index...")
t0 = time.time()
hash_index, reg_index = build_qdrant_index(args.qdrant_url)
logger.info("Index built in %.1fs", time.time() - t0)
# Phase 2: Load controls
logger.info("\nPhase 2: Loading controls...")
controls = load_controls(args.db_url, args.limit)
logger.info("Loaded %d controls", len(controls))
if not controls:
logger.info("No controls to process")
return
# Phase 3+4: Match and update
logger.info("\nPhase 3+4: Matching and updating...")
t0 = time.time()
stats = update_controls(
args.db_url, controls, hash_index, reg_index,
dry_run=args.dry_run, batch_size=args.batch_size,
)
elapsed = time.time() - t0
# Summary
logger.info("\n" + "=" * 60)
logger.info("RESULTS")
logger.info("=" * 60)
logger.info(" Total controls: %d", stats.total)
logger.info(" Already correct: %d (%.1f%%)", stats.already_correct,
stats.already_correct / max(stats.total, 1) * 100)
logger.info(" Matched (hash): %d (%.1f%%)", stats.matched_hash,
stats.matched_hash / max(stats.total, 1) * 100)
logger.info(" Matched (prefix): %d (%.1f%%)", stats.matched_prefix,
stats.matched_prefix / max(stats.total, 1) * 100)
logger.info(" Matched (overlap): %d (%.1f%%)", stats.matched_overlap,
stats.matched_overlap / max(stats.total, 1) * 100)
logger.info(" Unmatched: %d (%.1f%%)", stats.unmatched,
stats.unmatched / max(stats.total, 1) * 100)
logger.info(" Updated: %d", stats.updated)
logger.info(" Errors: %d", stats.errors)
logger.info(" Time: %.1fs (%.0f controls/sec)", elapsed,
stats.total / max(elapsed, 1))
if args.dry_run:
logger.info("\nDRY RUN — no changes written. Run without --dry-run to apply.")
if __name__ == "__main__":
main()