feat(controls): Zitierfaehigkeit — Embedding-Re-Link + Atom-Vererbung

citation_backfill Tier-1 von totem sha256-Hash auf Semantik-Suche gegen die
re-ingestierten, article_label-tragenden Chunks umgestellt (Fundstelle aus
article_label); rag_client reicht article_label durch (additiv, Default-Feld).
NEU: scripts/atom_citation_inheritance.py vererbt source_citation parent->atom
(license_rule != 3), iterativ. macmini-Apply verifiziert: Zitierfaehigkeit
6.9%->61.3% (+171.765 Atome), Stichprobe korrekt (Atom == Parent-Fundstelle).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-21 14:17:57 +02:00
parent ff4a743558
commit de542633e2
3 changed files with 238 additions and 111 deletions
@@ -0,0 +1,145 @@
#!/usr/bin/env python3
"""Inherit source_citation from parent to atom controls.
Background
==========
citation_backfill.py fills source_citation on the *source-bearing* controls
(those with source_original_text — ~2-7 %) by re-linking them to the
re-ingested, article_label-bearing chunks. The remaining ~93 % are "atom"
controls (decompositions) that carry a parent_control_uuid but no own citation.
They cite the SAME norm as their parent, so the citation can be inherited —
no re-matching needed.
Self-written controls (license_rule = 3) are skipped (no external source).
Runs in idempotent iterations (atom -> master -> grandmaster) and prints
per-stage counts before any write. Safe to rerun — only fills rows whose
source_citation lacks an 'article'.
Usage::
python3 scripts/atom_citation_inheritance.py --db-host 100.80.114.48 \\
--db-password breakpilot123 --dry-run
python3 scripts/atom_citation_inheritance.py --db-host 100.80.114.48 \\
--db-password breakpilot123 --apply
"""
from __future__ import annotations
import argparse
import os
import sys
DB_URL = os.getenv("DATABASE_URL", "postgresql://breakpilot:breakpilot@localhost:5432/breakpilot_db")
# A row "needs" a citation when it has no article yet.
_NEEDS = (
"(cc.source_citation IS NULL "
" OR cc.source_citation->>'article' IS NULL "
" OR cc.source_citation->>'article' = '')"
)
# A parent can supply one when it carries a real article.
_PARENT_HAS = (
"p.source_citation IS NOT NULL "
"AND p.source_citation->>'article' IS NOT NULL "
"AND p.source_citation->>'article' <> ''"
)
SQL_REPORT = f"""
SET search_path TO compliance, public;
SELECT
CASE WHEN cc.parent_control_uuid IS NULL THEN 'no_parent'
WHEN ({_PARENT_HAS.replace('p.', 'p2.')}) THEN 'parent_has_article'
ELSE 'parent_no_article' END AS bucket,
COUNT(*) AS n
FROM canonical_controls cc
LEFT JOIN canonical_controls p2 ON cc.parent_control_uuid = p2.id
WHERE {_NEEDS}
AND cc.license_rule IS DISTINCT FROM 3
GROUP BY 1 ORDER BY 2 DESC;
"""
SQL_INHERIT = f"""
SET search_path TO compliance, public;
UPDATE canonical_controls cc
SET source_citation = p.source_citation, updated_at = NOW()
FROM canonical_controls p
WHERE cc.parent_control_uuid = p.id
AND {_NEEDS}
AND {_PARENT_HAS}
AND cc.license_rule IS DISTINCT FROM 3;
"""
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument("--db-url", default=DB_URL,
help="Postgres URL (default: $DATABASE_URL)")
p.add_argument("--max-iterations", type=int, default=6,
help="Cap on inheritance iterations to avoid loops")
g = p.add_mutually_exclusive_group(required=True)
g.add_argument("--dry-run", action="store_true")
g.add_argument("--apply", action="store_true")
return p.parse_args()
def print_bucket(rows, label: str) -> None:
print(f"\n## {label}")
total = 0
for bucket, n in rows:
print(f" {bucket:20} {n:>8}")
total += n
print(f" {'TOTAL':20} {total:>8}")
def main() -> int:
args = parse_args()
try:
import psycopg2
except ImportError:
print("error: psycopg2 not installed", file=sys.stderr)
return 2
conn = psycopg2.connect(args.db_url)
conn.autocommit = False
cur = conn.cursor()
print("=" * 60)
print(" Atom citation inheritance — source_citation via parent")
print(f" Mode: {'DRY-RUN' if args.dry_run else 'APPLY'}")
print("=" * 60)
cur.execute(SQL_REPORT)
print_bucket(cur.fetchall(), "Controls without article (need citation)")
if args.dry_run:
cur.execute(
"SET search_path TO compliance, public; "
f"SELECT COUNT(*) FROM canonical_controls cc "
f"JOIN canonical_controls p ON cc.parent_control_uuid = p.id "
f"WHERE {_NEEDS} AND {_PARENT_HAS} AND cc.license_rule IS DISTINCT FROM 3;"
)
print(f"\n## First inherit-pass would fill: {cur.fetchone()[0]} rows")
print("\nNo writes performed. Use --apply to execute.")
conn.rollback()
return 0
total = 0
for i in range(1, args.max_iterations + 1):
cur.execute(SQL_INHERIT)
updated = cur.rowcount
total += updated
print(f"\n iteration {i}: {updated} rows inherited")
if updated == 0:
break
conn.commit()
print(f"\n✓ Total atoms inherited: {total}")
cur.execute(SQL_REPORT)
print_bucket(cur.fetchall(), "Remaining without article")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+90 -111
View File
@@ -7,7 +7,6 @@ Citation Backfill Service — enrich existing controls with article/paragraph pr
Tier 3 — Ollama LLM: ask local LLM to identify article/paragraph from text
"""
import hashlib
import json
import logging
import os
@@ -28,12 +27,13 @@ OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
OLLAMA_MODEL = os.getenv("CONTROL_GEN_OLLAMA_MODEL", "qwen3.5:35b-a3b")
LLM_TIMEOUT = float(os.getenv("CONTROL_GEN_LLM_TIMEOUT", "180"))
ALL_COLLECTIONS = [
"bp_compliance_ce",
# Tier-1 semantic re-link: min cosine for a source_original_text → chunk match.
EMBED_THRESHOLD = float(os.getenv("CITATION_EMBED_THRESHOLD", "0.80"))
# Collections that carry re-ingested, article_label-bearing chunks.
RELINK_COLLECTIONS = [
"bp_compliance_gesetze",
"bp_compliance_datenschutz",
"bp_dsfa_corpus",
"bp_legal_templates",
"bp_compliance_ce",
]
BACKFILL_SYSTEM_PROMPT = (
@@ -51,13 +51,14 @@ _SOURCE_ARTICLE_RE = re.compile(
class MatchResult:
article: str
paragraph: str
method: str # "hash", "regex", "llm"
method: str # "embed", "regex", "llm"
source: str = "" # regulation short/name (embed tier sets the cleaned source)
@dataclass
class BackfillResult:
total_controls: int = 0
matched_hash: int = 0
matched_embed: int = 0
matched_regex: int = 0
matched_llm: int = 0
unmatched: int = 0
@@ -71,7 +72,6 @@ class CitationBackfill:
def __init__(self, db: Session, rag_client: ComplianceRAGClient):
self.db = db
self.rag = rag_client
self._rag_index: dict[str, RAGSearchResult] = {}
async def run(self, dry_run: bool = True, limit: int = 0) -> BackfillResult:
"""Main entry: iterate controls missing article/paragraph, match to RAG, update."""
@@ -85,20 +85,10 @@ class CitationBackfill:
if not controls:
return result
# Collect hashes we need to find — only build index for controls with source text
needed_hashes: set[str] = set()
for ctrl in controls:
src = ctrl.get("source_original_text")
if src:
needed_hashes.add(hashlib.sha256(src.encode()).hexdigest())
if needed_hashes:
# Build targeted RAG index — only scroll collections that our controls reference
logger.info("Building targeted RAG hash index for %d source texts...", len(needed_hashes))
await self._build_rag_index_targeted(controls)
logger.info("RAG index built: %d chunks indexed, %d hashes needed", len(self._rag_index), len(needed_hashes))
else:
logger.info("No source_original_text found — skipping RAG index build")
# Tier-1 = per-control semantic search against the re-ingested, labeled chunks.
# (The old sha256(chunk.text) hash index died with re-chunking and is gone.)
with_source = sum(1 for c in controls if c.get("source_original_text"))
logger.info("Embedding-relink candidates (with source_original_text): %d", with_source)
# Process each control
for i, ctrl in enumerate(controls):
@@ -108,8 +98,8 @@ class CitationBackfill:
try:
match = await self._match_control(ctrl)
if match:
if match.method == "hash":
result.matched_hash += 1
if match.method == "embed":
result.matched_embed += 1
elif match.method == "regex":
result.matched_regex += 1
elif match.method == "llm":
@@ -139,8 +129,8 @@ class CitationBackfill:
result.errors.append(f"Commit failed: {e}")
logger.info(
"Backfill complete: %d total, hash=%d regex=%d llm=%d unmatched=%d updated=%d",
result.total_controls, result.matched_hash, result.matched_regex,
"Backfill complete: %d total, embed=%d regex=%d llm=%d unmatched=%d updated=%d",
result.total_controls, result.matched_embed, result.matched_regex,
result.matched_llm, result.unmatched, result.updated,
)
return result
@@ -178,93 +168,13 @@ class CitationBackfill:
controls.append(ctrl)
return controls
async def _build_rag_index_targeted(self, controls: list[dict]):
"""Build RAG index by scrolling only collections relevant to our controls.
Uses regulation codes from generation_metadata to identify which collections
to search, falling back to all collections only if needed.
"""
# Determine which collections are relevant based on regulation codes
regulation_to_collection = self._map_regulations_to_collections(controls)
collections_to_search = set(regulation_to_collection.values()) or set(ALL_COLLECTIONS)
logger.info("Targeted index: searching %d collections: %s",
len(collections_to_search), ", ".join(collections_to_search))
for collection in collections_to_search:
offset = None
page = 0
seen_offsets: set[str] = set()
while True:
chunks, next_offset = await self.rag.scroll(
collection=collection, offset=offset, limit=200,
)
if not chunks:
break
for chunk in chunks:
if chunk.text and len(chunk.text.strip()) >= 50:
h = hashlib.sha256(chunk.text.encode()).hexdigest()
self._rag_index[h] = chunk
page += 1
if page % 50 == 0:
logger.info("Indexing %s: page %d (%d chunks so far)",
collection, page, len(self._rag_index))
if not next_offset:
break
if next_offset in seen_offsets:
logger.warning("Scroll loop in %s at page %d — stopping", collection, page)
break
seen_offsets.add(next_offset)
offset = next_offset
logger.info("Indexed collection %s: %d pages", collection, page)
def _map_regulations_to_collections(self, controls: list[dict]) -> dict[str, str]:
"""Map regulation codes from controls to likely Qdrant collections."""
# Heuristic: regulation code prefix → collection
collection_map = {
"eu_": "bp_compliance_gesetze",
"dsgvo": "bp_compliance_datenschutz",
"bdsg": "bp_compliance_gesetze",
"ttdsg": "bp_compliance_gesetze",
"nist_": "bp_compliance_ce",
"owasp": "bp_compliance_ce",
"bsi_": "bp_compliance_ce",
"enisa": "bp_compliance_ce",
"at_": "bp_compliance_recht",
"fr_": "bp_compliance_recht",
"es_": "bp_compliance_recht",
}
result: dict[str, str] = {}
for ctrl in controls:
meta = ctrl.get("generation_metadata") or {}
reg = meta.get("source_regulation", "")
if not reg:
continue
for prefix, coll in collection_map.items():
if reg.startswith(prefix):
result[reg] = coll
break
else:
# Unknown regulation — search all
for coll in ALL_COLLECTIONS:
result[f"_all_{coll}"] = coll
return result
async def _match_control(self, ctrl: dict) -> Optional[MatchResult]:
"""3-tier matching: hash → regex → LLM."""
# Tier 1: Hash match against RAG index
source_text = ctrl.get("source_original_text")
if source_text:
h = hashlib.sha256(source_text.encode()).hexdigest()
chunk = self._rag_index.get(h)
if chunk and (chunk.article or chunk.paragraph):
return MatchResult(
article=chunk.article or "",
paragraph=chunk.paragraph or "",
method="hash",
)
# Tier 1: Semantic search against the re-ingested, labeled chunks
embed = await self._embedding_match(ctrl)
if embed:
return embed
# Tier 2: Regex parse concatenated source
citation = ctrl.get("source_citation") or {}
@@ -278,11 +188,60 @@ class CitationBackfill:
)
# Tier 3: Ollama LLM
if source_text:
if ctrl.get("source_original_text"):
return await self._llm_match(ctrl)
return None
async def _embedding_match(self, ctrl: dict) -> Optional[MatchResult]:
"""Tier 1: semantic-search source_original_text against the labeled chunks.
Takes the top hit (cosine >= EMBED_THRESHOLD) that carries a real article
and turns its article_label into a precise citation.
"""
source_text = ctrl.get("source_original_text")
if not source_text:
return None
query = source_text.strip()[:512]
best: Optional[RAGSearchResult] = None
for collection in self._collections_for(ctrl):
try:
hits = await self.rag.search(query, collection=collection, top_k=3)
except Exception as e:
logger.debug("embed search failed (%s): %s", collection, e)
hits = []
if hits and (best is None or hits[0].score > best.score):
best = hits[0]
if not best or best.score < EMBED_THRESHOLD:
return None
article = _article_part(best)
if not article:
return None
return MatchResult(
article=article,
paragraph=best.paragraph or "",
method="embed",
source=best.regulation_short or best.regulation_name or "",
)
def _collections_for(self, ctrl: dict) -> list[str]:
"""Likely collection(s) for a control's regulation; falls back to all three."""
meta = ctrl.get("generation_metadata") or {}
reg = (meta.get("source_regulation") or "").lower()
prefix_map = {
"eu_": "bp_compliance_gesetze", "bdsg": "bp_compliance_gesetze",
"de_": "bp_compliance_gesetze", "at_": "bp_compliance_gesetze",
"ch_": "bp_compliance_gesetze", "dsgvo": "bp_compliance_gesetze",
"trgs": "bp_compliance_ce", "trbs": "bp_compliance_ce", "asr": "bp_compliance_ce",
"nist": "bp_compliance_ce", "owasp": "bp_compliance_ce", "enisa": "bp_compliance_ce",
"edpb": "bp_compliance_datenschutz", "dsk": "bp_compliance_datenschutz",
"bfdi": "bp_compliance_datenschutz",
}
for prefix, coll in prefix_map.items():
if reg.startswith(prefix):
return [coll]
return list(RELINK_COLLECTIONS)
async def _llm_match(self, ctrl: dict) -> Optional[MatchResult]:
"""Use Ollama to identify article/paragraph from source text."""
citation = ctrl.get("source_citation") or {}
@@ -331,6 +290,9 @@ Bei deutschen Gesetzen mit § verwende: "§ XX" statt "Art. XX"."""
if parsed:
citation["source"] = parsed["name"]
# Embed tier carries the cleaned regulation name → prefer it as source.
if match.source:
citation["source"] = match.source
# Add separate article/paragraph fields
citation["article"] = match.article
citation["paragraph"] = match.paragraph
@@ -359,6 +321,23 @@ Bei deutschen Gesetzen mit § verwende: "§ XX" statt "Art. XX"."""
)
def _article_part(chunk: RAGSearchResult) -> str:
"""Precise article from a chunk: article_label minus the regulation name.
'BDSG § 38' -> '§ 38'; 'Art. 39 DSGVO' -> 'Art. 39'; 'NIST SP 800-53r5 SA-12' -> 'SA-12'.
Falls back to the bare article field. Returns '' if only a doc-level name is present.
"""
label = (chunk.article_label or "").strip()
reg = (chunk.regulation_short or "").strip()
if label:
part = label
if reg and reg in label:
part = label.replace(reg, "").strip(" ,;-")
if part and part != reg:
return part
return (chunk.article or "").strip()
def _parse_concatenated_source(source: str) -> Optional[dict]:
"""Parse 'DSGVO Art. 35'{name: 'DSGVO', article: 'Art. 35'}.
+3
View File
@@ -33,6 +33,7 @@ class RAGSearchResult:
paragraph: str
source_url: str
score: float
article_label: str = ""
collection: str = ""
page: Optional[int] = None
@@ -90,6 +91,7 @@ class ComplianceRAGClient:
regulation_short=r.get("regulation_short", ""),
category=r.get("category", ""),
article=r.get("article", ""),
article_label=r.get("article_label", ""),
paragraph=r.get("paragraph", ""),
source_url=r.get("source_url", ""),
score=r.get("score", 0.0),
@@ -171,6 +173,7 @@ class ComplianceRAGClient:
regulation_short=r.get("regulation_short", ""),
category=r.get("category", ""),
article=r.get("article", ""),
article_label=r.get("article_label", ""),
paragraph=r.get("paragraph", ""),
source_url=r.get("source_url", ""),
score=0.0,