docs: session handover — D2-D5 complete, quality report, NIST plan

Major session achievements:
- Structural metadata end-to-end (D2-D4)
- 430 docs re-ingested with new chunking
- HTML stripping + charset detection (0% → 97.6%)
- 20 EU regulations from EUR-Lex HTML (DSGVO: 0% → 92%)
- Quality report script (500 controls: 13% fully correct)
- Frontend requirements.map fix

Open: NIST/ENISA text normalization, citation backfill,
D5 script safety (upload-before-delete), BEG IV ingestion.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-02 22:22:55 +02:00
parent 3009f3d13a
commit ff21bc258a
2 changed files with 423 additions and 87 deletions
+303
View File
@@ -0,0 +1,303 @@
#!/usr/bin/env python3
"""
E2E Quality Report: Verify controls have correct source citations.
Loads N random controls from PostgreSQL, cross-references with Qdrant chunks,
and reports mismatches between source_citation and actual chunk metadata.
Usage:
# Against Mac Mini
python3 scripts/quality_report.py --db-host macmini --qdrant-url http://macmini:6333
# Smaller sample
python3 scripts/quality_report.py --db-host macmini --sample 100
"""
import argparse
import json
import logging
import sys
import httpx
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("quality-report")
COLLECTIONS = [
"bp_compliance_ce", "bp_compliance_gesetze", "bp_compliance_datenschutz",
"bp_dsfa_corpus", "bp_legal_templates",
]
def load_controls(db_url: str, sample_size: int) -> list[dict]:
"""Load random controls with source_citation from PostgreSQL."""
engine = create_engine(db_url)
Session = sessionmaker(bind=engine)
with Session() as db:
rows = db.execute(text("""
SELECT id::text, control_id, title,
source_citation::text, source_original_text,
generation_metadata::text, release_state
FROM compliance.canonical_controls
WHERE source_citation IS NOT NULL
AND source_original_text IS NOT NULL
AND release_state = 'draft'
ORDER BY RANDOM()
LIMIT :n
"""), {"n": sample_size}).fetchall()
controls = []
for row in rows:
citation = json.loads(row[3]) if row[3] else {}
metadata = json.loads(row[5]) if row[5] else {}
controls.append({
"id": row[0],
"control_id": row[1],
"title": row[2],
"citation": citation,
"source_text": row[4],
"metadata": metadata,
"release_state": row[6],
})
return controls
def build_qdrant_index(qdrant_url: str) -> dict:
"""Build regulation_id → list[chunk] index from Qdrant.
Controls were generated from OLD chunks (512 chars). Qdrant now has
NEW chunks (1500 chars). Hash matching won't work — use regulation +
section matching instead.
"""
logger.info("Building Qdrant chunk index by regulation_id...")
index = {} # regulation_id → [{"section": ..., "text_snippet": ..., ...}]
client = httpx.Client(timeout=60.0)
for coll in COLLECTIONS:
offset = None
for _ in range(600):
body = {"limit": 250, "with_payload": True, "with_vector": False}
if offset:
body["offset"] = offset
r = client.post(f"{qdrant_url}/collections/{coll}/points/scroll", json=body)
if r.status_code != 200:
break
data = r.json()["result"]
for pt in data["points"]:
reg_id = pt["payload"].get("regulation_id", "")
if not reg_id:
continue
chunk = {
"section": pt["payload"].get("section", ""),
"section_title": pt["payload"].get("section_title", ""),
"paragraph": pt["payload"].get("paragraph", ""),
"text_snippet": pt["payload"].get("chunk_text", "")[:200],
"filename": pt["payload"].get("filename", ""),
"collection": coll,
}
index.setdefault(reg_id, []).append(chunk)
offset = data.get("next_page_offset")
if not offset:
break
client.close()
total = sum(len(v) for v in index.values())
logger.info("Qdrant index: %d regulations, %d chunks", len(index), total)
return index
def check_control(ctrl: dict, qdrant_index: dict) -> dict:
"""Check a single control's source_citation against Qdrant chunks.
Strategy: Find chunks by regulation_id from generation_metadata,
then check if any chunk has a matching section/article.
"""
result = {
"control_id": ctrl["control_id"],
"title": (ctrl["title"] or "")[:60],
"citation_source": ctrl["citation"].get("source", ""),
"citation_article": ctrl["citation"].get("article", ""),
"citation_paragraph": ctrl["citation"].get("paragraph", ""),
"citation_page": ctrl["citation"].get("page"),
"issues": [],
}
# Get regulation_id from generation_metadata
reg_code = ctrl["metadata"].get("source_regulation", "")
citation_article = ctrl["citation"].get("article", "")
# Check 1: Does the control have a regulation reference?
if not reg_code:
result["issues"].append("NO_REGULATION_CODE")
return result
# Check 2: Does this regulation exist in Qdrant?
chunks = qdrant_index.get(reg_code, [])
if not chunks:
result["issues"].append(f"REGULATION_NOT_IN_QDRANT: {reg_code}")
result["reg_found"] = False
return result
result["reg_found"] = True
result["reg_chunks"] = len(chunks)
# Check 3: Does the control have an article citation?
if not citation_article:
result["issues"].append("NO_ARTICLE_IN_CITATION")
# Still check if chunks have section metadata at all
has_section = any(c["section"] for c in chunks)
if has_section:
result["issues"].append("CHUNKS_HAVE_SECTIONS_BUT_CONTROL_MISSING")
return result
# Check 4: Is the cited article found in any chunk's section?
norm_article = citation_article.strip().lower()
matching_chunks = [
c for c in chunks
if c["section"] and (
norm_article == c["section"].strip().lower()
or norm_article in c["section"].strip().lower()
or c["section"].strip().lower() in norm_article
)
]
if matching_chunks:
result["article_match"] = True
result["matched_section"] = matching_chunks[0]["section"]
else:
# Check if ANY chunk has sections (the article might just not match)
sections_in_regulation = sorted(set(c["section"] for c in chunks if c["section"]))
if sections_in_regulation:
result["issues"].append(
f"ARTICLE_NOT_FOUND_IN_CHUNKS: '{citation_article}' not in {sections_in_regulation[:5]}"
)
else:
result["issues"].append("NO_SECTIONS_IN_REGULATION_CHUNKS")
# Check 5: Does source_original_text contain the cited article?
source_text = ctrl["source_text"] or ""
if citation_article and source_text:
if citation_article.lower() not in source_text.lower():
if f"[{citation_article}" not in source_text:
result["issues"].append("ARTICLE_NOT_IN_SOURCE_TEXT")
if not result["issues"]:
result["issues"] = ["OK"]
return result
def generate_report(results: list[dict]):
"""Print the quality report."""
total = len(results)
ok = sum(1 for r in results if r["issues"] == ["OK"])
chunk_found = sum(1 for r in results if r.get("chunk_found", False))
no_chunk = sum(1 for r in results if "CHUNK_NOT_FOUND" in r["issues"])
no_article = sum(1 for r in results if "NO_ARTICLE_IN_CITATION" in r["issues"])
no_section = sum(1 for r in results if "NO_SECTION_IN_CHUNK" in r["issues"])
mismatch = sum(1 for r in results if any("MISMATCH" in i for i in r["issues"]))
not_in_text = sum(1 for r in results if "ARTICLE_NOT_IN_SOURCE_TEXT" in r["issues"])
print("\n" + "=" * 100)
print("QUALITAETSREPORT: CONTROL SOURCE CITATION VERIFICATION")
print("=" * 100)
print(f"\nStichprobe: {total} Controls")
print(f"\n{'Metrik':<45} {'Anzahl':>8} {'Anteil':>8}")
print("-" * 65)
print(f"{'OK (keine Probleme)':<45} {ok:>8} {ok*100//max(total,1):>7}%")
print(f"{'Chunk in Qdrant gefunden':<45} {chunk_found:>8} {chunk_found*100//max(total,1):>7}%")
print(f"{'Chunk NICHT gefunden':<45} {no_chunk:>8} {no_chunk*100//max(total,1):>7}%")
print(f"{'Kein article in source_citation':<45} {no_article:>8} {no_article*100//max(total,1):>7}%")
print(f"{'Kein section im Qdrant-Chunk':<45} {no_section:>8} {no_section*100//max(total,1):>7}%")
print(f"{'Article/Section Mismatch':<45} {mismatch:>8} {mismatch*100//max(total,1):>7}%")
print(f"{'Article nicht im Source-Text':<45} {not_in_text:>8} {not_in_text*100//max(total,1):>7}%")
# Show sample mismatches
mismatches = [r for r in results if any("MISMATCH" in i for i in r["issues"])]
if mismatches:
print("\n=== MISMATCHES (erste 10) ===\n")
for r in mismatches[:10]:
issues = [i for i in r["issues"] if "MISMATCH" in i]
print(f" {r['control_id']:20s} {r['title'][:40]:40s}")
for i in issues:
print(f"{i}")
# Show sample NOT_FOUND
not_found = [r for r in results if "CHUNK_NOT_FOUND" in r["issues"]]
if not_found:
print("\n=== CHUNK NOT FOUND (erste 10) ===\n")
for r in not_found[:10]:
src = r.get("citation_source", "?")
art = r.get("citation_article", "?")
print(f" {r['control_id']:20s} {src[:25]:25s} {art}")
# Distribution by source
print("\n=== NACH QUELLE ===\n")
source_stats = {}
for r in results:
src = r.get("citation_source", "?")[:30]
if src not in source_stats:
source_stats[src] = {"total": 0, "ok": 0, "no_chunk": 0, "no_section": 0}
source_stats[src]["total"] += 1
if r["issues"] == ["OK"]:
source_stats[src]["ok"] += 1
if "CHUNK_NOT_FOUND" in r["issues"]:
source_stats[src]["no_chunk"] += 1
if "NO_SECTION_IN_CHUNK" in r["issues"]:
source_stats[src]["no_section"] += 1
print(f" {'Quelle':<32} {'Total':>6} {'OK':>6} {'OK%':>6} {'NoChunk':>8} {'NoSect':>8}")
print(f" {'-'*72}")
for src in sorted(source_stats.keys(), key=lambda s: -source_stats[s]["total"]):
s = source_stats[src]
pct = s["ok"] * 100 // max(s["total"], 1)
print(f" {src:<32} {s['total']:>6} {s['ok']:>6} {pct:>5}% {s['no_chunk']:>8} {s['no_section']:>8}")
print(f"\n{'='*100}")
verdict = "PASS" if ok * 100 // max(total, 1) >= 50 else "NEEDS IMPROVEMENT"
print(f"ERGEBNIS: {verdict}{ok}/{total} Controls ({ok*100//max(total,1)}%) vollstaendig korrekt")
print(f"{'='*100}")
def main():
parser = argparse.ArgumentParser(description="Control Source Citation Quality Report")
parser.add_argument("--db-host", default="macmini")
parser.add_argument("--db-port", type=int, default=5432)
parser.add_argument("--db-name", default="breakpilot_db")
parser.add_argument("--db-user", default="breakpilot")
parser.add_argument("--db-pass", default="breakpilot123")
parser.add_argument("--qdrant-url", default="http://macmini:6333")
parser.add_argument("--sample", type=int, default=500)
args = parser.parse_args()
db_url = f"postgresql://{args.db_user}:{args.db_pass}@{args.db_host}:{args.db_port}/{args.db_name}"
# Load controls
logger.info("Loading %d random controls from DB...", args.sample)
controls = load_controls(db_url, args.sample)
logger.info("Loaded %d controls with source_citation", len(controls))
if not controls:
print("ERROR: No controls found with source_citation")
sys.exit(1)
# Build Qdrant index
qdrant_index = build_qdrant_index(args.qdrant_url)
# Check each control
logger.info("Checking %d controls against Qdrant...", len(controls))
results = []
for ctrl in controls:
result = check_control(ctrl, qdrant_index)
results.append(result)
# Report
generate_report(results)
if __name__ == "__main__":
main()