feat(pipeline): D6 citation backfill + E2/E3 law ingestion scripts
- d6_citation_backfill.py: 3-tier matching (hash/prefix/overlap), archives old citations, updated 3.651 controls (93.6% coverage) - ingest_de_laws.py: 8 German laws ingested (ArbZG, MuSchG, NachwG, MiLoG, GmbHG, AktG, InsO, BUrlG — 1.629 chunks) - ingest_eu_regulations.py: EUR-Lex ingestion (needs manual HTML due to AWS WAF). CSRD, CSDDD, EU Taxonomy, eIDAS 2.0, Pay Transparency manually ingested (1.057 chunks) - Updated session handover with current state Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,498 @@
|
||||
#!/usr/bin/env python3
|
||||
"""D6 Citation Backfill — update ~291k controls with section metadata from Qdrant chunks.
|
||||
|
||||
Archives old source_citation in generation_metadata.old_citation.
|
||||
Updates source_citation.article, .paragraph, .page from matched Qdrant chunks.
|
||||
|
||||
3-tier matching:
|
||||
Tier 1: sha256(source_original_text) → exact chunk text match
|
||||
Tier 2: Parse [section] prefix from source_original_text
|
||||
Tier 3: Best text overlap within same regulation_id
|
||||
|
||||
Usage:
|
||||
python3 control-pipeline/scripts/d6_citation_backfill.py --dry-run --limit 100
|
||||
python3 control-pipeline/scripts/d6_citation_backfill.py --batch-size 1000
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
import httpx
|
||||
import psycopg2
|
||||
import psycopg2.extras
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
)
|
||||
logger = logging.getLogger("d6-backfill")
|
||||
|
||||
DB_URL = os.getenv("DATABASE_URL", "postgresql://breakpilot:breakpilot@localhost:5432/breakpilot_db")
|
||||
QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333")
|
||||
|
||||
COLLECTIONS = [
|
||||
"bp_compliance_ce",
|
||||
"bp_compliance_gesetze",
|
||||
"bp_compliance_datenschutz",
|
||||
"bp_dsfa_corpus",
|
||||
"bp_legal_templates",
|
||||
]
|
||||
|
||||
# Parse [§ 312k Title] or [AC-1 POLICY] prefix from chunk text
|
||||
_SECTION_PREFIX_RE = re.compile(r'^\[([^\]]+)\]\s*')
|
||||
|
||||
|
||||
@dataclass
|
||||
class ChunkMeta:
|
||||
section: str
|
||||
section_title: str
|
||||
paragraph: str
|
||||
page: Optional[int]
|
||||
regulation_id: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class Stats:
|
||||
total: int = 0
|
||||
already_correct: int = 0
|
||||
matched_hash: int = 0
|
||||
matched_prefix: int = 0
|
||||
matched_overlap: int = 0
|
||||
unmatched: int = 0
|
||||
updated: int = 0
|
||||
errors: int = 0
|
||||
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# Phase 1: Build Qdrant index
|
||||
# -------------------------------------------------------------------
|
||||
|
||||
def build_qdrant_index(qdrant_url: str) -> tuple[dict, dict]:
|
||||
"""Build hash index and regulation index from all Qdrant collections.
|
||||
|
||||
Returns:
|
||||
hash_index: {sha256(chunk_text) → ChunkMeta}
|
||||
reg_index: {regulation_id → [ChunkMeta with text snippets]}
|
||||
"""
|
||||
hash_index: dict[str, ChunkMeta] = {}
|
||||
reg_index: dict[str, list[tuple[str, ChunkMeta]]] = {}
|
||||
total_chunks = 0
|
||||
|
||||
for coll in COLLECTIONS:
|
||||
offset = None
|
||||
coll_count = 0
|
||||
with httpx.Client(timeout=60.0) as c:
|
||||
while True:
|
||||
body = {
|
||||
"limit": 250,
|
||||
"with_payload": [
|
||||
"chunk_text", "section", "section_title",
|
||||
"paragraph", "page", "regulation_id",
|
||||
],
|
||||
"with_vector": False,
|
||||
}
|
||||
if offset is not None:
|
||||
body["offset"] = offset
|
||||
resp = c.post(
|
||||
f"{qdrant_url}/collections/{coll}/points/scroll",
|
||||
json=body,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()["result"]
|
||||
|
||||
for pt in data["points"]:
|
||||
p = pt.get("payload", {})
|
||||
chunk_text = p.get("chunk_text", "")
|
||||
if not chunk_text or len(chunk_text.strip()) < 30:
|
||||
continue
|
||||
|
||||
meta = ChunkMeta(
|
||||
section=p.get("section", "") or "",
|
||||
section_title=p.get("section_title", "") or "",
|
||||
paragraph=p.get("paragraph", "") or "",
|
||||
page=p.get("page"),
|
||||
regulation_id=p.get("regulation_id", "") or "",
|
||||
)
|
||||
|
||||
# Hash index
|
||||
h = hashlib.sha256(chunk_text.encode()).hexdigest()
|
||||
if meta.section: # only index chunks WITH section data
|
||||
hash_index[h] = meta
|
||||
|
||||
# Regulation index (for text overlap matching)
|
||||
if meta.regulation_id and meta.section:
|
||||
reg_index.setdefault(meta.regulation_id, []).append(
|
||||
(chunk_text[:500], meta)
|
||||
)
|
||||
|
||||
coll_count += 1
|
||||
|
||||
offset = data.get("next_page_offset")
|
||||
if offset is None:
|
||||
break
|
||||
|
||||
total_chunks += coll_count
|
||||
logger.info(" [%s] %d chunks indexed", coll, coll_count)
|
||||
|
||||
logger.info("Qdrant index: %d total chunks, %d with section (hash), %d regulations",
|
||||
total_chunks, len(hash_index), len(reg_index))
|
||||
return hash_index, reg_index
|
||||
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# Phase 2: Load controls
|
||||
# -------------------------------------------------------------------
|
||||
|
||||
def load_controls(db_url: str, limit: int = 0) -> list[dict]:
|
||||
"""Load all controls needing citation update."""
|
||||
conn = psycopg2.connect(db_url)
|
||||
conn.set_session(autocommit=False)
|
||||
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||
|
||||
cur.execute("SET search_path TO compliance, core, public")
|
||||
|
||||
query = """
|
||||
SELECT id, control_id, source_citation, source_original_text,
|
||||
generation_metadata, license_rule
|
||||
FROM canonical_controls
|
||||
WHERE license_rule IN (1, 2)
|
||||
AND source_citation IS NOT NULL
|
||||
ORDER BY control_id
|
||||
"""
|
||||
if limit > 0:
|
||||
query += f" LIMIT {limit}"
|
||||
|
||||
cur.execute(query)
|
||||
rows = cur.fetchall()
|
||||
conn.close()
|
||||
|
||||
controls = []
|
||||
for row in rows:
|
||||
ctrl = dict(row)
|
||||
ctrl["id"] = str(ctrl["id"])
|
||||
for jf in ("source_citation", "generation_metadata"):
|
||||
val = ctrl.get(jf)
|
||||
if isinstance(val, str):
|
||||
try:
|
||||
ctrl[jf] = json.loads(val)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
ctrl[jf] = {}
|
||||
elif val is None:
|
||||
ctrl[jf] = {}
|
||||
controls.append(ctrl)
|
||||
|
||||
return controls
|
||||
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# Phase 3: Matching
|
||||
# -------------------------------------------------------------------
|
||||
|
||||
def match_control(
|
||||
ctrl: dict,
|
||||
hash_index: dict[str, ChunkMeta],
|
||||
reg_index: dict[str, list[tuple[str, ChunkMeta]]],
|
||||
) -> tuple[Optional[ChunkMeta], str]:
|
||||
"""Match a control to a Qdrant chunk. Returns (meta, method) or (None, '')."""
|
||||
source_text = ctrl.get("source_original_text", "") or ""
|
||||
|
||||
# Tier 1: Hash match
|
||||
if source_text:
|
||||
h = hashlib.sha256(source_text.encode()).hexdigest()
|
||||
meta = hash_index.get(h)
|
||||
if meta and meta.section:
|
||||
return meta, "hash"
|
||||
|
||||
# Tier 2: Parse [section] prefix from source_original_text
|
||||
if source_text:
|
||||
m = _SECTION_PREFIX_RE.match(source_text)
|
||||
if m:
|
||||
prefix = m.group(1).strip()
|
||||
parsed = _parse_section_from_prefix(prefix)
|
||||
if parsed:
|
||||
return parsed, "prefix"
|
||||
|
||||
# Tier 3: Text overlap within same regulation
|
||||
gen_meta = ctrl.get("generation_metadata") or {}
|
||||
reg_id = gen_meta.get("source_regulation", "")
|
||||
if reg_id and source_text and reg_id in reg_index:
|
||||
best = _find_best_overlap(source_text, reg_index[reg_id])
|
||||
if best:
|
||||
return best, "overlap"
|
||||
|
||||
return None, ""
|
||||
|
||||
|
||||
def _parse_section_from_prefix(prefix: str) -> Optional[ChunkMeta]:
|
||||
"""Parse a section prefix like '§ 312k Kuendigungsbutton' or 'AC-1 POLICY'."""
|
||||
if not prefix:
|
||||
return None
|
||||
|
||||
# § pattern
|
||||
m = re.match(r'(§\s*\d+[a-z]*)\s*(.*)', prefix)
|
||||
if m:
|
||||
return ChunkMeta(
|
||||
section=m.group(1).strip(),
|
||||
section_title=m.group(2).strip(),
|
||||
paragraph="", page=None, regulation_id="",
|
||||
)
|
||||
|
||||
# Art./Artikel pattern
|
||||
m = re.match(r'(Art(?:ikel|\.)\s*\d+)\s*(.*)', prefix, re.IGNORECASE)
|
||||
if m:
|
||||
return ChunkMeta(
|
||||
section=m.group(1).strip(),
|
||||
section_title=m.group(2).strip(),
|
||||
paragraph="", page=None, regulation_id="",
|
||||
)
|
||||
|
||||
# NIST control pattern (AC-1, AU-2, etc.)
|
||||
m = re.match(r'([A-Z]{2,4}-\d+(?:\(\d+\))?)\s*(.*)', prefix)
|
||||
if m:
|
||||
return ChunkMeta(
|
||||
section=m.group(1).strip(),
|
||||
section_title=m.group(2).strip(),
|
||||
paragraph="", page=None, regulation_id="",
|
||||
)
|
||||
|
||||
# Numbered section (3.1 Title)
|
||||
m = re.match(r'(\d+(?:\.\d+)+)\s*(.*)', prefix)
|
||||
if m:
|
||||
return ChunkMeta(
|
||||
section=m.group(1).strip(),
|
||||
section_title=m.group(2).strip(),
|
||||
paragraph="", page=None, regulation_id="",
|
||||
)
|
||||
|
||||
# ALL-CAPS heading (fallback — use as section_title)
|
||||
if prefix == prefix.upper() and len(prefix) > 3:
|
||||
return ChunkMeta(
|
||||
section="", section_title=prefix,
|
||||
paragraph="", page=None, regulation_id="",
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _find_best_overlap(source_text: str, chunks: list[tuple[str, ChunkMeta]]) -> Optional[ChunkMeta]:
|
||||
"""Find chunk with best text overlap (simple word-set Jaccard)."""
|
||||
source_words = set(source_text.lower().split())
|
||||
if len(source_words) < 5:
|
||||
return None
|
||||
|
||||
best_score = 0.0
|
||||
best_meta = None
|
||||
|
||||
for chunk_text, meta in chunks:
|
||||
chunk_words = set(chunk_text.lower().split())
|
||||
if not chunk_words:
|
||||
continue
|
||||
intersection = len(source_words & chunk_words)
|
||||
union = len(source_words | chunk_words)
|
||||
jaccard = intersection / union if union > 0 else 0
|
||||
if jaccard > best_score and jaccard > 0.3: # 30% threshold
|
||||
best_score = jaccard
|
||||
best_meta = meta
|
||||
|
||||
return best_meta
|
||||
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# Phase 4: Update controls
|
||||
# -------------------------------------------------------------------
|
||||
|
||||
def update_controls(
|
||||
db_url: str,
|
||||
controls: list[dict],
|
||||
hash_index: dict[str, ChunkMeta],
|
||||
reg_index: dict[str, list[tuple[str, ChunkMeta]]],
|
||||
dry_run: bool = True,
|
||||
batch_size: int = 1000,
|
||||
) -> Stats:
|
||||
"""Match and update all controls."""
|
||||
stats = Stats(total=len(controls))
|
||||
|
||||
conn = psycopg2.connect(db_url)
|
||||
conn.set_session(autocommit=False)
|
||||
cur = conn.cursor()
|
||||
cur.execute("SET search_path TO compliance, core, public")
|
||||
|
||||
updates = []
|
||||
|
||||
for i, ctrl in enumerate(controls):
|
||||
if i > 0 and i % 5000 == 0:
|
||||
logger.info("Progress: %d/%d (hash=%d prefix=%d overlap=%d unmatched=%d)",
|
||||
i, stats.total, stats.matched_hash, stats.matched_prefix,
|
||||
stats.matched_overlap, stats.unmatched)
|
||||
|
||||
citation = ctrl.get("source_citation") or {}
|
||||
old_article = citation.get("article", "")
|
||||
gen_meta = ctrl.get("generation_metadata") or {}
|
||||
|
||||
# Match
|
||||
meta, method = match_control(ctrl, hash_index, reg_index)
|
||||
|
||||
if not meta or not meta.section:
|
||||
# No match — check if existing article is already good
|
||||
if old_article:
|
||||
stats.already_correct += 1
|
||||
else:
|
||||
stats.unmatched += 1
|
||||
continue
|
||||
|
||||
# Check if update is needed
|
||||
if old_article == meta.section:
|
||||
stats.already_correct += 1
|
||||
continue
|
||||
|
||||
# Track method
|
||||
if method == "hash":
|
||||
stats.matched_hash += 1
|
||||
elif method == "prefix":
|
||||
stats.matched_prefix += 1
|
||||
elif method == "overlap":
|
||||
stats.matched_overlap += 1
|
||||
|
||||
# Archive old citation
|
||||
if old_article or citation.get("paragraph"):
|
||||
gen_meta["old_citation"] = {
|
||||
"article": old_article,
|
||||
"paragraph": citation.get("paragraph", ""),
|
||||
"page": citation.get("page"),
|
||||
"archived_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
||||
}
|
||||
|
||||
# Update citation
|
||||
citation["article"] = meta.section
|
||||
if meta.paragraph:
|
||||
citation["paragraph"] = meta.paragraph
|
||||
if meta.page is not None:
|
||||
citation["page"] = meta.page
|
||||
|
||||
# Update generation_metadata
|
||||
gen_meta["source_article"] = meta.section
|
||||
if meta.paragraph:
|
||||
gen_meta["source_paragraph"] = meta.paragraph
|
||||
if meta.page is not None:
|
||||
gen_meta["source_page"] = meta.page
|
||||
gen_meta["backfill_method"] = method
|
||||
gen_meta["backfill_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
|
||||
|
||||
updates.append((
|
||||
json.dumps(citation, ensure_ascii=False),
|
||||
json.dumps(gen_meta, ensure_ascii=False, default=str),
|
||||
ctrl["id"],
|
||||
))
|
||||
|
||||
# Batch commit
|
||||
if len(updates) >= batch_size and not dry_run:
|
||||
_execute_batch(cur, updates)
|
||||
conn.commit()
|
||||
stats.updated += len(updates)
|
||||
logger.info("Committed batch: %d updates (total %d)", len(updates), stats.updated)
|
||||
updates = []
|
||||
|
||||
# Final batch
|
||||
if updates and not dry_run:
|
||||
_execute_batch(cur, updates)
|
||||
conn.commit()
|
||||
stats.updated += len(updates)
|
||||
logger.info("Committed final batch: %d updates (total %d)", len(updates), stats.updated)
|
||||
elif updates and dry_run:
|
||||
stats.updated = len(updates) # would-be updates
|
||||
|
||||
conn.close()
|
||||
return stats
|
||||
|
||||
|
||||
def _execute_batch(cur, updates: list[tuple]):
|
||||
"""Execute batch UPDATE statements."""
|
||||
for citation_json, meta_json, ctrl_id in updates:
|
||||
cur.execute(
|
||||
"""UPDATE canonical_controls
|
||||
SET source_citation = %s::jsonb,
|
||||
generation_metadata = %s::jsonb,
|
||||
updated_at = NOW()
|
||||
WHERE id = %s::uuid""",
|
||||
(citation_json, meta_json, ctrl_id),
|
||||
)
|
||||
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# Main
|
||||
# -------------------------------------------------------------------
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="D6 Citation Backfill")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Don't write to DB")
|
||||
parser.add_argument("--limit", type=int, default=0, help="Limit controls (0=all)")
|
||||
parser.add_argument("--batch-size", type=int, default=1000)
|
||||
parser.add_argument("--db-url", default=DB_URL)
|
||||
parser.add_argument("--qdrant-url", default=QDRANT_URL)
|
||||
args = parser.parse_args()
|
||||
|
||||
logger.info("=" * 60)
|
||||
logger.info("D6 Citation Backfill")
|
||||
logger.info(" DB: %s", args.db_url.split("@")[-1])
|
||||
logger.info(" Qdrant: %s", args.qdrant_url)
|
||||
logger.info(" Dry run: %s", args.dry_run)
|
||||
logger.info(" Limit: %s", args.limit or "ALL")
|
||||
logger.info("=" * 60)
|
||||
|
||||
# Phase 1: Build Qdrant index
|
||||
logger.info("\nPhase 1: Building Qdrant index...")
|
||||
t0 = time.time()
|
||||
hash_index, reg_index = build_qdrant_index(args.qdrant_url)
|
||||
logger.info("Index built in %.1fs", time.time() - t0)
|
||||
|
||||
# Phase 2: Load controls
|
||||
logger.info("\nPhase 2: Loading controls...")
|
||||
controls = load_controls(args.db_url, args.limit)
|
||||
logger.info("Loaded %d controls", len(controls))
|
||||
|
||||
if not controls:
|
||||
logger.info("No controls to process")
|
||||
return
|
||||
|
||||
# Phase 3+4: Match and update
|
||||
logger.info("\nPhase 3+4: Matching and updating...")
|
||||
t0 = time.time()
|
||||
stats = update_controls(
|
||||
args.db_url, controls, hash_index, reg_index,
|
||||
dry_run=args.dry_run, batch_size=args.batch_size,
|
||||
)
|
||||
elapsed = time.time() - t0
|
||||
|
||||
# Summary
|
||||
logger.info("\n" + "=" * 60)
|
||||
logger.info("RESULTS")
|
||||
logger.info("=" * 60)
|
||||
logger.info(" Total controls: %d", stats.total)
|
||||
logger.info(" Already correct: %d (%.1f%%)", stats.already_correct,
|
||||
stats.already_correct / max(stats.total, 1) * 100)
|
||||
logger.info(" Matched (hash): %d (%.1f%%)", stats.matched_hash,
|
||||
stats.matched_hash / max(stats.total, 1) * 100)
|
||||
logger.info(" Matched (prefix): %d (%.1f%%)", stats.matched_prefix,
|
||||
stats.matched_prefix / max(stats.total, 1) * 100)
|
||||
logger.info(" Matched (overlap): %d (%.1f%%)", stats.matched_overlap,
|
||||
stats.matched_overlap / max(stats.total, 1) * 100)
|
||||
logger.info(" Unmatched: %d (%.1f%%)", stats.unmatched,
|
||||
stats.unmatched / max(stats.total, 1) * 100)
|
||||
logger.info(" Updated: %d", stats.updated)
|
||||
logger.info(" Errors: %d", stats.errors)
|
||||
logger.info(" Time: %.1fs (%.0f controls/sec)", elapsed,
|
||||
stats.total / max(elapsed, 1))
|
||||
|
||||
if args.dry_run:
|
||||
logger.info("\nDRY RUN — no changes written. Run without --dry-run to apply.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user