#!/usr/bin/env python3 """Re-upload NIST/BSI/ENISA docs with chunk_strategy='legal' for section metadata. The docs were already uploaded with 'recursive' strategy (no section detection). This script re-uploads with 'legal' strategy, then deletes old recursive chunks. Usage (on Mac Mini): python3 control-pipeline/scripts/reupload_legal_strategy.py python3 control-pipeline/scripts/reupload_legal_strategy.py --dry-run """ import argparse import io import json import re import sys import time import unicodedata import httpx import pdfplumber RAG_URL = "https://localhost:8097" QDRANT_URL = "http://localhost:6333" UPLOAD_TIMEOUT = 1800.0 # ---- Documents to process ---- DOCS = [ # 4 NIST docs already extracted at /tmp/nist_*.txt { "regulation_id": "nist_sp800_53r5", "collection": "bp_compliance_datenschutz", "upload_filename": "NIST_SP_800_53r5.txt", "local_txt": "/tmp/nist_nist_sp800_53r5.txt", "minio_pdf": None, # already extracted "extra_metadata": { "regulation_id": "nist_sp800_53r5", "source_id": "nist", "doc_type": "controls_catalog", "guideline_name": "NIST SP 800-53 Rev. 5", "license": "public_domain_us_gov", "source": "nist.gov", }, }, { "regulation_id": "nist_sp_800_82r3", "collection": "bp_compliance_ce", "upload_filename": "nist_sp_800_82r3.txt", "local_txt": "/tmp/nist_nist_sp_800_82r3.txt", "minio_pdf": None, "extra_metadata": { "regulation_id": "nist_sp_800_82r3", "regulation_short": "NIST SP 800-82", "license": "public_domain_us", "source": "nist.gov", }, }, { "regulation_id": "nist_sp_800_160v1r1", "collection": "bp_compliance_ce", "upload_filename": "nist_sp_800_160v1r1.txt", "local_txt": "/tmp/nist_160.txt", "minio_pdf": None, "extra_metadata": { "regulation_id": "nist_sp_800_160v1r1", "regulation_short": "NIST SP 800-160", "license": "public_domain_us", "source": "nist.gov", }, }, { "regulation_id": "nist_sp800_207", "collection": "bp_compliance_datenschutz", "upload_filename": "NIST_SP_800_207.txt", "local_txt": None, # needs extraction "minio_pdf": "compliance/bund/compliance/2026/NIST_SP_800_207.pdf", "extra_metadata": { "regulation_id": "nist_sp800_207", "source_id": "nist", "doc_type": "architecture", "guideline_name": "NIST SP 800-207 Zero Trust Architecture", "license": "public_domain_us_gov", "source": "nist.gov", }, }, # Additional low-quality docs (need extraction from MinIO) { "regulation_id": "nist_csf_2_0", "collection": "bp_compliance_datenschutz", "upload_filename": "nist_csf_2_0.txt", "local_txt": None, "minio_pdf": "compliance/bund/compliance/2026/nist_csf_2_0.pdf", "extra_metadata": { "regulation_id": "nist_csf_2_0", "license": "public_domain_us", "source": "nist.gov", }, }, { "regulation_id": "nistir_8259a", "collection": "bp_compliance_datenschutz", "upload_filename": "nistir_8259a.txt", "local_txt": None, "minio_pdf": "compliance/bund/compliance/2026/nistir_8259a.pdf", "extra_metadata": { "regulation_id": "nistir_8259a", "license": "public_domain_us", "source": "nist.gov", }, }, { "regulation_id": "nist_ai_rmf", "collection": "bp_compliance_datenschutz", "upload_filename": "nist_ai_rmf.txt", "local_txt": None, "minio_pdf": "compliance/bund/compliance/2026/nist_ai_rmf.pdf", "extra_metadata": { "regulation_id": "nist_ai_rmf", "license": "public_domain_us", "source": "nist.gov", }, }, { "regulation_id": "nist_sp_800_30r1", "collection": "bp_compliance_ce", "upload_filename": "nist_sp_800_30r1.txt", "local_txt": None, "minio_pdf": "compliance/bund/compliance/2026/nist_sp_800_30r1.pdf", "extra_metadata": { "regulation_id": "nist_sp_800_30r1", "license": "public_domain_us", "source": "nist.gov", }, }, { "regulation_id": "cisa_secure_by_design", "collection": "bp_compliance_ce", "upload_filename": "cisa_secure_by_design.txt", "local_txt": None, "minio_pdf": "compliance/bund/compliance/2026/cisa_secure_by_design.pdf", "extra_metadata": { "regulation_id": "cisa_secure_by_design", "license": "public_domain_us", "source": "cisa.gov", }, }, { "regulation_id": "cvss_v4_0", "collection": "bp_compliance_ce", "upload_filename": "cvss_v4_0.txt", "local_txt": None, "minio_pdf": "compliance/bund/compliance/2026/cvss_v4_0.pdf", "extra_metadata": { "regulation_id": "cvss_v4_0", "license": "public_domain_us", "source": "first.org", }, }, { "regulation_id": "enisa_ics_scada_dependencies", "collection": "bp_compliance_ce", "upload_filename": "enisa_ics_scada.txt", "local_txt": None, "minio_pdf": "compliance/bund/compliance/2026/enisa_ics_scada.pdf", "extra_metadata": { "regulation_id": "enisa_ics_scada_dependencies", "license": "reuse_with_attribution", "source": "enisa.europa.eu", }, }, { "regulation_id": "enisa_threat_landscape_supply_chain", "collection": "bp_compliance_ce", "upload_filename": "enisa_supply_chain_security.txt", "local_txt": None, "minio_pdf": "compliance/bund/compliance/2026/enisa_supply_chain_security.pdf", "extra_metadata": { "regulation_id": "enisa_threat_landscape_supply_chain", "license": "reuse_with_attribution", "source": "enisa.europa.eu", }, }, { "regulation_id": "enisa_supply_chain_good_practices", "collection": "bp_compliance_ce", "upload_filename": "enisa_supply_chain_good_practices.txt", "local_txt": None, "minio_pdf": "compliance/bund/compliance/2026/enisa_supply_chain_good_practices.pdf", "extra_metadata": { "regulation_id": "enisa_supply_chain_good_practices", "license": "reuse_with_attribution", "source": "enisa.europa.eu", }, }, ] def normalize_pdf_text(text): text = unicodedata.normalize('NFKC', text) text = text.replace('\u00ad', '').replace('\u200b', '') prev = None while prev != text: prev = text text = re.sub(r'(\d+)\s+\.\s+(\d+)', r'\1.\2', text) text = re.sub(r'\b([A-Z]{2,4})\s+-\s+(\d+)\b', r'\1-\2', text) text = re.sub( r'\b([A-Z]{2})\s*\.\s*([A-Z]{2})\s*-\s*(\d{2})\b', r'\1.\2-\3', text ) text = re.sub(r'\(\s+(\d+)\s+\)', r'(\1)', text) text = re.sub(r'[^\S\n]{2,}', ' ', text) return text def get_text(doc): """Get document text: from local file or extract from MinIO PDF.""" if doc["local_txt"]: print(f" Reading local: {doc['local_txt']}") with open(doc["local_txt"], encoding="utf-8") as f: return f.read() print(f" Downloading from MinIO: {doc['minio_pdf']}") with httpx.Client(timeout=60, verify=False) as c: resp = c.get(f"{RAG_URL}/api/v1/documents/download/{doc['minio_pdf']}") resp.raise_for_status() url = resp.json()["url"] with httpx.Client(timeout=300, verify=False) as c: pdf_bytes = c.get(url).content print(f" Downloaded {len(pdf_bytes) / 1024 / 1024:.1f} MB") print(" Extracting with pdfplumber...") parts = [] with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf: for i, page in enumerate(pdf.pages): t = page.extract_text(x_tolerance=3, y_tolerance=4) if t: parts.append(t) if (i + 1) % 50 == 0: print(f" {i + 1}/{len(pdf.pages)} pages...") text = "\n\n".join(parts) text = normalize_pdf_text(text) print(f" Extracted {len(text):,} chars") return text def get_old_doc_ids(collection, regulation_id): """Get all document_ids for existing chunks.""" doc_ids = set() offset = None with httpx.Client(timeout=60) as c: while True: body = { "filter": {"must": [ {"key": "regulation_id", "match": {"value": regulation_id}} ]}, "limit": 100, "with_payload": ["document_id"], } if offset is not None: body["offset"] = offset resp = c.post( f"{QDRANT_URL}/collections/{collection}/points/scroll", json=body, ) resp.raise_for_status() data = resp.json()["result"] for pt in data["points"]: did = pt.get("payload", {}).get("document_id") if did: doc_ids.add(did) offset = data.get("next_page_offset") if offset is None: break return doc_ids def upload_text_legal(text, filename, collection, extra_metadata): """Upload with chunk_strategy='legal'.""" form_data = { "collection": collection, "data_type": "compliance", "bundesland": "bund", "use_case": "compliance", "year": "2026", "chunk_strategy": "legal", "chunk_size": "1500", "chunk_overlap": "100", "metadata_json": json.dumps(extra_metadata, ensure_ascii=False), } with httpx.Client(timeout=UPLOAD_TIMEOUT, verify=False) as c: resp = c.post( f"{RAG_URL}/api/v1/documents/upload", files={"file": (filename, text.encode("utf-8"), "text/plain")}, data=form_data, ) resp.raise_for_status() return resp.json() def delete_by_doc_ids(collection, doc_ids): """Delete chunks matching specific document_ids.""" with httpx.Client(timeout=30) as c: for did in doc_ids: c.post( f"{QDRANT_URL}/collections/{collection}/points/delete", json={"filter": {"must": [ {"key": "document_id", "match": {"value": did}} ]}}, ).raise_for_status() def count_chunks(collection, regulation_id): with httpx.Client(timeout=30) as c: resp = c.post( f"{QDRANT_URL}/collections/{collection}/points/count", json={"filter": {"must": [ {"key": "regulation_id", "match": {"value": regulation_id}} ]}, "exact": True}, ) resp.raise_for_status() return resp.json()["result"]["count"] def check_section_rate(collection, regulation_id): total = 0 with_sec = 0 offset = None with httpx.Client(timeout=60) as c: while True: body = { "filter": {"must": [ {"key": "regulation_id", "match": {"value": regulation_id}} ]}, "limit": 100, "with_payload": ["section"], } if offset is not None: body["offset"] = offset resp = c.post( f"{QDRANT_URL}/collections/{collection}/points/scroll", json=body, ) resp.raise_for_status() data = resp.json()["result"] for pt in data["points"]: total += 1 s = pt.get("payload", {}).get("section", "") if s and s.strip(): with_sec += 1 offset = data.get("next_page_offset") if offset is None: break return total, with_sec def main(): parser = argparse.ArgumentParser() parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() print("=" * 60) print("Re-upload with chunk_strategy='legal'") print(f"Documents: {len(DOCS)}, Dry run: {args.dry_run}") print("=" * 60) results = [] for i, doc in enumerate(DOCS, 1): reg_id = doc["regulation_id"] coll = doc["collection"] print(f"\n[{i}/{len(DOCS)}] {doc['upload_filename']} → {coll}") # 1. Check existing old_count = count_chunks(coll, reg_id) old_doc_ids = get_old_doc_ids(coll, reg_id) if old_count > 0 else set() print(f" Old: {old_count} chunks, {len(old_doc_ids)} doc_ids") if args.dry_run: print(" DRY RUN — skipping") results.append({"file": doc["upload_filename"], "old": old_count, "new": "?", "sect": "?"}) continue # 2. Get text text = get_text(doc) # 3. Upload with legal strategy print(" Uploading with strategy='legal'...") result = upload_text_legal( text, doc["upload_filename"], coll, doc["extra_metadata"]) new_chunks = result.get("chunks_count", 0) new_doc_id = result.get("document_id", "") print(f" New: {new_chunks} chunks (doc_id={new_doc_id})") if new_chunks == 0: print(" ERROR: 0 chunks — keeping old!") results.append({"file": doc["upload_filename"], "old": old_count, "new": 0, "sect": 0}) continue # 4. Delete old chunks (safe: new ones already exist) if old_doc_ids: # Exclude the new document_id just in case old_doc_ids.discard(new_doc_id) if old_doc_ids: print(f" Deleting {len(old_doc_ids)} old doc_ids...") delete_by_doc_ids(coll, old_doc_ids) # 5. Check section rate total, with_sec = check_section_rate(coll, reg_id) pct = (with_sec / total * 100) if total > 0 else 0 print(f" Section rate: {with_sec}/{total} ({pct:.0f}%)") results.append({"file": doc["upload_filename"], "old": old_count, "new": new_chunks, "sect": round(pct, 1)}) if i < len(DOCS): time.sleep(2) # Summary print("\n" + "=" * 60) print("RESULTS") print("=" * 60) for r in results: print(f" {r['file']:<45} old={r['old']:<6} new={r['new']:<6} sect={r['sect']}%") total_new = sum(r["new"] for r in results if isinstance(r["new"], int)) print(f"\nTotal new chunks: {total_new}") if __name__ == "__main__": main()