From de542633e2da4305d70bdba13be3cb5ffac04298 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Sun, 21 Jun 2026 14:17:57 +0200 Subject: [PATCH] =?UTF-8?q?feat(controls):=20Zitierfaehigkeit=20=E2=80=94?= =?UTF-8?q?=20Embedding-Re-Link=20+=20Atom-Vererbung?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit citation_backfill Tier-1 von totem sha256-Hash auf Semantik-Suche gegen die re-ingestierten, article_label-tragenden Chunks umgestellt (Fundstelle aus article_label); rag_client reicht article_label durch (additiv, Default-Feld). NEU: scripts/atom_citation_inheritance.py vererbt source_citation parent->atom (license_rule != 3), iterativ. macmini-Apply verifiziert: Zitierfaehigkeit 6.9%->61.3% (+171.765 Atome), Stichprobe korrekt (Atom == Parent-Fundstelle). Co-Authored-By: Claude Opus 4.7 --- .../scripts/atom_citation_inheritance.py | 145 +++++++++++++ .../services/citation_backfill.py | 201 ++++++++---------- control-pipeline/services/rag_client.py | 3 + 3 files changed, 238 insertions(+), 111 deletions(-) create mode 100644 control-pipeline/scripts/atom_citation_inheritance.py diff --git a/control-pipeline/scripts/atom_citation_inheritance.py b/control-pipeline/scripts/atom_citation_inheritance.py new file mode 100644 index 0000000..f9620ba --- /dev/null +++ b/control-pipeline/scripts/atom_citation_inheritance.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python3 +"""Inherit source_citation from parent to atom controls. + +Background +========== + +citation_backfill.py fills source_citation on the *source-bearing* controls +(those with source_original_text — ~2-7 %) by re-linking them to the +re-ingested, article_label-bearing chunks. The remaining ~93 % are "atom" +controls (decompositions) that carry a parent_control_uuid but no own citation. +They cite the SAME norm as their parent, so the citation can be inherited — +no re-matching needed. + +Self-written controls (license_rule = 3) are skipped (no external source). + +Runs in idempotent iterations (atom -> master -> grandmaster) and prints +per-stage counts before any write. Safe to rerun — only fills rows whose +source_citation lacks an 'article'. + +Usage:: + + python3 scripts/atom_citation_inheritance.py --db-host 100.80.114.48 \\ + --db-password breakpilot123 --dry-run + python3 scripts/atom_citation_inheritance.py --db-host 100.80.114.48 \\ + --db-password breakpilot123 --apply +""" + +from __future__ import annotations + +import argparse +import os +import sys + +DB_URL = os.getenv("DATABASE_URL", "postgresql://breakpilot:breakpilot@localhost:5432/breakpilot_db") + +# A row "needs" a citation when it has no article yet. +_NEEDS = ( + "(cc.source_citation IS NULL " + " OR cc.source_citation->>'article' IS NULL " + " OR cc.source_citation->>'article' = '')" +) +# A parent can supply one when it carries a real article. +_PARENT_HAS = ( + "p.source_citation IS NOT NULL " + "AND p.source_citation->>'article' IS NOT NULL " + "AND p.source_citation->>'article' <> ''" +) + +SQL_REPORT = f""" +SET search_path TO compliance, public; +SELECT + CASE WHEN cc.parent_control_uuid IS NULL THEN 'no_parent' + WHEN ({_PARENT_HAS.replace('p.', 'p2.')}) THEN 'parent_has_article' + ELSE 'parent_no_article' END AS bucket, + COUNT(*) AS n +FROM canonical_controls cc +LEFT JOIN canonical_controls p2 ON cc.parent_control_uuid = p2.id +WHERE {_NEEDS} + AND cc.license_rule IS DISTINCT FROM 3 +GROUP BY 1 ORDER BY 2 DESC; +""" + +SQL_INHERIT = f""" +SET search_path TO compliance, public; +UPDATE canonical_controls cc +SET source_citation = p.source_citation, updated_at = NOW() +FROM canonical_controls p +WHERE cc.parent_control_uuid = p.id + AND {_NEEDS} + AND {_PARENT_HAS} + AND cc.license_rule IS DISTINCT FROM 3; +""" + + +def parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser(description=__doc__) + p.add_argument("--db-url", default=DB_URL, + help="Postgres URL (default: $DATABASE_URL)") + p.add_argument("--max-iterations", type=int, default=6, + help="Cap on inheritance iterations to avoid loops") + g = p.add_mutually_exclusive_group(required=True) + g.add_argument("--dry-run", action="store_true") + g.add_argument("--apply", action="store_true") + return p.parse_args() + + +def print_bucket(rows, label: str) -> None: + print(f"\n## {label}") + total = 0 + for bucket, n in rows: + print(f" {bucket:20} {n:>8}") + total += n + print(f" {'TOTAL':20} {total:>8}") + + +def main() -> int: + args = parse_args() + try: + import psycopg2 + except ImportError: + print("error: psycopg2 not installed", file=sys.stderr) + return 2 + + conn = psycopg2.connect(args.db_url) + conn.autocommit = False + cur = conn.cursor() + + print("=" * 60) + print(" Atom citation inheritance — source_citation via parent") + print(f" Mode: {'DRY-RUN' if args.dry_run else 'APPLY'}") + print("=" * 60) + + cur.execute(SQL_REPORT) + print_bucket(cur.fetchall(), "Controls without article (need citation)") + + if args.dry_run: + cur.execute( + "SET search_path TO compliance, public; " + f"SELECT COUNT(*) FROM canonical_controls cc " + f"JOIN canonical_controls p ON cc.parent_control_uuid = p.id " + f"WHERE {_NEEDS} AND {_PARENT_HAS} AND cc.license_rule IS DISTINCT FROM 3;" + ) + print(f"\n## First inherit-pass would fill: {cur.fetchone()[0]} rows") + print("\nNo writes performed. Use --apply to execute.") + conn.rollback() + return 0 + + total = 0 + for i in range(1, args.max_iterations + 1): + cur.execute(SQL_INHERIT) + updated = cur.rowcount + total += updated + print(f"\n iteration {i}: {updated} rows inherited") + if updated == 0: + break + conn.commit() + print(f"\n✓ Total atoms inherited: {total}") + + cur.execute(SQL_REPORT) + print_bucket(cur.fetchall(), "Remaining without article") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/control-pipeline/services/citation_backfill.py b/control-pipeline/services/citation_backfill.py index 9222445..28c9fb8 100644 --- a/control-pipeline/services/citation_backfill.py +++ b/control-pipeline/services/citation_backfill.py @@ -7,7 +7,6 @@ Citation Backfill Service — enrich existing controls with article/paragraph pr Tier 3 — Ollama LLM: ask local LLM to identify article/paragraph from text """ -import hashlib import json import logging import os @@ -28,12 +27,13 @@ OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434") OLLAMA_MODEL = os.getenv("CONTROL_GEN_OLLAMA_MODEL", "qwen3.5:35b-a3b") LLM_TIMEOUT = float(os.getenv("CONTROL_GEN_LLM_TIMEOUT", "180")) -ALL_COLLECTIONS = [ - "bp_compliance_ce", +# Tier-1 semantic re-link: min cosine for a source_original_text → chunk match. +EMBED_THRESHOLD = float(os.getenv("CITATION_EMBED_THRESHOLD", "0.80")) +# Collections that carry re-ingested, article_label-bearing chunks. +RELINK_COLLECTIONS = [ "bp_compliance_gesetze", "bp_compliance_datenschutz", - "bp_dsfa_corpus", - "bp_legal_templates", + "bp_compliance_ce", ] BACKFILL_SYSTEM_PROMPT = ( @@ -51,13 +51,14 @@ _SOURCE_ARTICLE_RE = re.compile( class MatchResult: article: str paragraph: str - method: str # "hash", "regex", "llm" + method: str # "embed", "regex", "llm" + source: str = "" # regulation short/name (embed tier sets the cleaned source) @dataclass class BackfillResult: total_controls: int = 0 - matched_hash: int = 0 + matched_embed: int = 0 matched_regex: int = 0 matched_llm: int = 0 unmatched: int = 0 @@ -71,7 +72,6 @@ class CitationBackfill: def __init__(self, db: Session, rag_client: ComplianceRAGClient): self.db = db self.rag = rag_client - self._rag_index: dict[str, RAGSearchResult] = {} async def run(self, dry_run: bool = True, limit: int = 0) -> BackfillResult: """Main entry: iterate controls missing article/paragraph, match to RAG, update.""" @@ -85,20 +85,10 @@ class CitationBackfill: if not controls: return result - # Collect hashes we need to find — only build index for controls with source text - needed_hashes: set[str] = set() - for ctrl in controls: - src = ctrl.get("source_original_text") - if src: - needed_hashes.add(hashlib.sha256(src.encode()).hexdigest()) - - if needed_hashes: - # Build targeted RAG index — only scroll collections that our controls reference - logger.info("Building targeted RAG hash index for %d source texts...", len(needed_hashes)) - await self._build_rag_index_targeted(controls) - logger.info("RAG index built: %d chunks indexed, %d hashes needed", len(self._rag_index), len(needed_hashes)) - else: - logger.info("No source_original_text found — skipping RAG index build") + # Tier-1 = per-control semantic search against the re-ingested, labeled chunks. + # (The old sha256(chunk.text) hash index died with re-chunking and is gone.) + with_source = sum(1 for c in controls if c.get("source_original_text")) + logger.info("Embedding-relink candidates (with source_original_text): %d", with_source) # Process each control for i, ctrl in enumerate(controls): @@ -108,8 +98,8 @@ class CitationBackfill: try: match = await self._match_control(ctrl) if match: - if match.method == "hash": - result.matched_hash += 1 + if match.method == "embed": + result.matched_embed += 1 elif match.method == "regex": result.matched_regex += 1 elif match.method == "llm": @@ -139,8 +129,8 @@ class CitationBackfill: result.errors.append(f"Commit failed: {e}") logger.info( - "Backfill complete: %d total, hash=%d regex=%d llm=%d unmatched=%d updated=%d", - result.total_controls, result.matched_hash, result.matched_regex, + "Backfill complete: %d total, embed=%d regex=%d llm=%d unmatched=%d updated=%d", + result.total_controls, result.matched_embed, result.matched_regex, result.matched_llm, result.unmatched, result.updated, ) return result @@ -178,93 +168,13 @@ class CitationBackfill: controls.append(ctrl) return controls - async def _build_rag_index_targeted(self, controls: list[dict]): - """Build RAG index by scrolling only collections relevant to our controls. - - Uses regulation codes from generation_metadata to identify which collections - to search, falling back to all collections only if needed. - """ - # Determine which collections are relevant based on regulation codes - regulation_to_collection = self._map_regulations_to_collections(controls) - collections_to_search = set(regulation_to_collection.values()) or set(ALL_COLLECTIONS) - - logger.info("Targeted index: searching %d collections: %s", - len(collections_to_search), ", ".join(collections_to_search)) - - for collection in collections_to_search: - offset = None - page = 0 - seen_offsets: set[str] = set() - while True: - chunks, next_offset = await self.rag.scroll( - collection=collection, offset=offset, limit=200, - ) - if not chunks: - break - for chunk in chunks: - if chunk.text and len(chunk.text.strip()) >= 50: - h = hashlib.sha256(chunk.text.encode()).hexdigest() - self._rag_index[h] = chunk - page += 1 - if page % 50 == 0: - logger.info("Indexing %s: page %d (%d chunks so far)", - collection, page, len(self._rag_index)) - if not next_offset: - break - if next_offset in seen_offsets: - logger.warning("Scroll loop in %s at page %d — stopping", collection, page) - break - seen_offsets.add(next_offset) - offset = next_offset - - logger.info("Indexed collection %s: %d pages", collection, page) - - def _map_regulations_to_collections(self, controls: list[dict]) -> dict[str, str]: - """Map regulation codes from controls to likely Qdrant collections.""" - # Heuristic: regulation code prefix → collection - collection_map = { - "eu_": "bp_compliance_gesetze", - "dsgvo": "bp_compliance_datenschutz", - "bdsg": "bp_compliance_gesetze", - "ttdsg": "bp_compliance_gesetze", - "nist_": "bp_compliance_ce", - "owasp": "bp_compliance_ce", - "bsi_": "bp_compliance_ce", - "enisa": "bp_compliance_ce", - "at_": "bp_compliance_recht", - "fr_": "bp_compliance_recht", - "es_": "bp_compliance_recht", - } - result: dict[str, str] = {} - for ctrl in controls: - meta = ctrl.get("generation_metadata") or {} - reg = meta.get("source_regulation", "") - if not reg: - continue - for prefix, coll in collection_map.items(): - if reg.startswith(prefix): - result[reg] = coll - break - else: - # Unknown regulation — search all - for coll in ALL_COLLECTIONS: - result[f"_all_{coll}"] = coll - return result - async def _match_control(self, ctrl: dict) -> Optional[MatchResult]: """3-tier matching: hash → regex → LLM.""" - # Tier 1: Hash match against RAG index - source_text = ctrl.get("source_original_text") - if source_text: - h = hashlib.sha256(source_text.encode()).hexdigest() - chunk = self._rag_index.get(h) - if chunk and (chunk.article or chunk.paragraph): - return MatchResult( - article=chunk.article or "", - paragraph=chunk.paragraph or "", - method="hash", - ) + # Tier 1: Semantic search against the re-ingested, labeled chunks + embed = await self._embedding_match(ctrl) + if embed: + return embed # Tier 2: Regex parse concatenated source citation = ctrl.get("source_citation") or {} @@ -278,11 +188,60 @@ class CitationBackfill: ) # Tier 3: Ollama LLM - if source_text: + if ctrl.get("source_original_text"): return await self._llm_match(ctrl) return None + async def _embedding_match(self, ctrl: dict) -> Optional[MatchResult]: + """Tier 1: semantic-search source_original_text against the labeled chunks. + + Takes the top hit (cosine >= EMBED_THRESHOLD) that carries a real article + and turns its article_label into a precise citation. + """ + source_text = ctrl.get("source_original_text") + if not source_text: + return None + query = source_text.strip()[:512] + best: Optional[RAGSearchResult] = None + for collection in self._collections_for(ctrl): + try: + hits = await self.rag.search(query, collection=collection, top_k=3) + except Exception as e: + logger.debug("embed search failed (%s): %s", collection, e) + hits = [] + if hits and (best is None or hits[0].score > best.score): + best = hits[0] + if not best or best.score < EMBED_THRESHOLD: + return None + article = _article_part(best) + if not article: + return None + return MatchResult( + article=article, + paragraph=best.paragraph or "", + method="embed", + source=best.regulation_short or best.regulation_name or "", + ) + + def _collections_for(self, ctrl: dict) -> list[str]: + """Likely collection(s) for a control's regulation; falls back to all three.""" + meta = ctrl.get("generation_metadata") or {} + reg = (meta.get("source_regulation") or "").lower() + prefix_map = { + "eu_": "bp_compliance_gesetze", "bdsg": "bp_compliance_gesetze", + "de_": "bp_compliance_gesetze", "at_": "bp_compliance_gesetze", + "ch_": "bp_compliance_gesetze", "dsgvo": "bp_compliance_gesetze", + "trgs": "bp_compliance_ce", "trbs": "bp_compliance_ce", "asr": "bp_compliance_ce", + "nist": "bp_compliance_ce", "owasp": "bp_compliance_ce", "enisa": "bp_compliance_ce", + "edpb": "bp_compliance_datenschutz", "dsk": "bp_compliance_datenschutz", + "bfdi": "bp_compliance_datenschutz", + } + for prefix, coll in prefix_map.items(): + if reg.startswith(prefix): + return [coll] + return list(RELINK_COLLECTIONS) + async def _llm_match(self, ctrl: dict) -> Optional[MatchResult]: """Use Ollama to identify article/paragraph from source text.""" citation = ctrl.get("source_citation") or {} @@ -331,6 +290,9 @@ Bei deutschen Gesetzen mit § verwende: "§ XX" statt "Art. XX".""" if parsed: citation["source"] = parsed["name"] + # Embed tier carries the cleaned regulation name → prefer it as source. + if match.source: + citation["source"] = match.source # Add separate article/paragraph fields citation["article"] = match.article citation["paragraph"] = match.paragraph @@ -359,6 +321,23 @@ Bei deutschen Gesetzen mit § verwende: "§ XX" statt "Art. XX".""" ) +def _article_part(chunk: RAGSearchResult) -> str: + """Precise article from a chunk: article_label minus the regulation name. + + 'BDSG § 38' -> '§ 38'; 'Art. 39 DSGVO' -> 'Art. 39'; 'NIST SP 800-53r5 SA-12' -> 'SA-12'. + Falls back to the bare article field. Returns '' if only a doc-level name is present. + """ + label = (chunk.article_label or "").strip() + reg = (chunk.regulation_short or "").strip() + if label: + part = label + if reg and reg in label: + part = label.replace(reg, "").strip(" ,;-") + if part and part != reg: + return part + return (chunk.article or "").strip() + + def _parse_concatenated_source(source: str) -> Optional[dict]: """Parse 'DSGVO Art. 35' → {name: 'DSGVO', article: 'Art. 35'}. diff --git a/control-pipeline/services/rag_client.py b/control-pipeline/services/rag_client.py index 1fef26e..afd87f6 100644 --- a/control-pipeline/services/rag_client.py +++ b/control-pipeline/services/rag_client.py @@ -33,6 +33,7 @@ class RAGSearchResult: paragraph: str source_url: str score: float + article_label: str = "" collection: str = "" page: Optional[int] = None @@ -90,6 +91,7 @@ class ComplianceRAGClient: regulation_short=r.get("regulation_short", ""), category=r.get("category", ""), article=r.get("article", ""), + article_label=r.get("article_label", ""), paragraph=r.get("paragraph", ""), source_url=r.get("source_url", ""), score=r.get("score", 0.0), @@ -171,6 +173,7 @@ class ComplianceRAGClient: regulation_short=r.get("regulation_short", ""), category=r.get("category", ""), article=r.get("article", ""), + article_label=r.get("article_label", ""), paragraph=r.get("paragraph", ""), source_url=r.get("source_url", ""), score=0.0,