feat(pipeline): structural metadata end-to-end (Blocks D2-D4)

D2: RAG service stores section/section_title/paragraph/paragraph_num/page
from embedding service chunks_with_metadata into Qdrant payloads.

D3: Control generator prefers section > article > section_title from
Qdrant, adds page to source_citation and generation_metadata.

D4: Validated with real BGB §§ 312-312k text. Found and fixed critical
bug where Phase 3 overlap destroyed the [§ ...] section prefix, causing
only the first chunk per document to have metadata. All subsequent
chunks lost section info.

Also fixes pre-existing lint issues (unused imports, ambiguous variable
names, duplicate dict key, bare except).

456 tests passing (58 embedding + 387 pipeline + 11 rag-service).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-01 20:34:00 +02:00
parent da21339e76
commit 93099b2770
15 changed files with 1086 additions and 25 deletions
@@ -0,0 +1,268 @@
#!/usr/bin/env python3
"""
D4 Integration Test: Upload BGB excerpt → verify Qdrant payloads.
Usage:
# Dry-run (local chunking only, no services needed)
python3 scripts/test_d4_integration.py --dry-run
# Against Mac Mini
python3 scripts/test_d4_integration.py \
--rag-url https://macmini:8097 \
--qdrant-url http://macmini:6333
# Against production
python3 scripts/test_d4_integration.py \
--rag-url https://rag-prod:8097 \
--qdrant-url http://qdrant-prod:6333
"""
import argparse
import json
import os
import sys
import time
import httpx
FIXTURE_PATH = os.path.join(
os.path.dirname(__file__), "..", "..", "embedding-service",
"tests", "fixtures", "bgb_312_excerpt.txt",
)
COLLECTION = "bp_compliance_gesetze"
REG_CODE = "BGB_D4_TEST"
# Expected sections in the BGB excerpt
EXPECTED_SECTIONS = {"§ 312", "§ 312a", "§ 312g", "§ 312k"}
def load_fixture() -> str:
with open(FIXTURE_PATH, encoding="utf-8") as f:
return f.read()
def upload_document(rag_url: str, text: str) -> dict:
"""Upload BGB excerpt to RAG service."""
metadata = json.dumps({
"regulation_code": REG_CODE,
"regulation_name_de": "BGB (D4 Test)",
"source_type": "law",
})
with httpx.Client(timeout=60.0, verify=False) as client:
resp = client.post(
f"{rag_url}/api/v1/documents/upload",
files={"file": ("bgb_312_test.txt", text.encode(), "text/plain")},
data={
"collection": COLLECTION,
"data_type": "law",
"bundesland": "bund",
"use_case": "compliance",
"year": "2026",
"chunk_strategy": "recursive",
"chunk_size": "1500",
"chunk_overlap": "100",
"metadata_json": metadata,
},
)
resp.raise_for_status()
return resp.json()
def scroll_chunks(qdrant_url: str, document_id: str) -> list[dict]:
"""Scroll Qdrant for chunks matching this document_id."""
all_points = []
offset = None
with httpx.Client(timeout=30.0) as client:
while True:
body: dict = {
"limit": 100,
"with_payload": True,
"with_vector": False,
"filter": {
"must": [{
"key": "document_id",
"match": {"value": document_id},
}]
},
}
if offset:
body["offset"] = offset
resp = client.post(
f"{qdrant_url}/collections/{COLLECTION}/points/scroll",
json=body,
)
resp.raise_for_status()
data = resp.json()["result"]
all_points.extend(data["points"])
offset = data.get("next_page_offset")
if not offset:
break
return all_points
def delete_test_data(qdrant_url: str, document_id: str):
"""Clean up test chunks from Qdrant."""
with httpx.Client(timeout=30.0) as client:
resp = client.post(
f"{qdrant_url}/collections/{COLLECTION}/points/delete",
json={
"filter": {
"must": [{
"key": "document_id",
"match": {"value": document_id},
}]
}
},
)
resp.raise_for_status()
def verify_chunks(points: list[dict]) -> dict:
"""Analyze chunks and return a verification report."""
report = {
"total_chunks": len(points),
"sections_found": set(),
"chunks_with_section": 0,
"chunks_with_paragraph": 0,
"chunks_with_page": 0,
"section_details": [],
"issues": [],
}
for pt in points:
payload = pt.get("payload", {})
section = payload.get("section", "")
section_title = payload.get("section_title", "")
paragraph = payload.get("paragraph", "")
paragraph_num = payload.get("paragraph_num")
page = payload.get("page")
chunk_idx = payload.get("chunk_index", "?")
if section:
report["sections_found"].add(section)
report["chunks_with_section"] += 1
if paragraph:
report["chunks_with_paragraph"] += 1
if page is not None:
report["chunks_with_page"] += 1
report["section_details"].append({
"chunk_index": chunk_idx,
"section": section,
"section_title": section_title[:40],
"paragraph": paragraph,
"paragraph_num": paragraph_num,
"page": page,
"text_preview": payload.get("chunk_text", "")[:60],
})
# Checks
missing = EXPECTED_SECTIONS - report["sections_found"]
if missing:
report["issues"].append(f"Missing sections: {missing}")
if "§ 312k" not in report["sections_found"]:
report["issues"].append("CRITICAL: § 312k not found!")
section_ratio = report["chunks_with_section"] / max(report["total_chunks"], 1)
if section_ratio < 0.9:
report["issues"].append(
f"Only {section_ratio:.0%} chunks have section metadata (expected >= 90%)"
)
return report
def print_report(report: dict):
"""Print verification report."""
print("\n" + "=" * 60)
print("D4 VALIDATION REPORT")
print("=" * 60)
print(f"Total chunks: {report['total_chunks']}")
print(f"With section: {report['chunks_with_section']}")
print(f"With paragraph: {report['chunks_with_paragraph']}")
print(f"With page: {report['chunks_with_page']}")
print(f"Sections found: {sorted(report['sections_found'])}")
print("\nChunk details:")
for d in sorted(report["section_details"], key=lambda x: x["chunk_index"]):
print(
f" [{d['chunk_index']:2}] "
f"section={d['section']!r:12s} "
f"title={d['section_title']!r:30s} "
f"para={d['paragraph']!r:8s}"
)
if report["issues"]:
print(f"\nISSUES ({len(report['issues'])}):")
for issue in report["issues"]:
print(f" - {issue}")
print("\nRESULT: FAIL")
else:
print("\nRESULT: PASS — all sections detected, metadata quality OK")
def main():
parser = argparse.ArgumentParser(description="D4 Integration Test")
parser.add_argument("--rag-url", default="https://macmini:8097")
parser.add_argument("--qdrant-url", default="http://macmini:6333")
parser.add_argument("--dry-run", action="store_true",
help="Only test local chunking, no upload")
parser.add_argument("--keep", action="store_true",
help="Don't delete test data after verification")
args = parser.parse_args()
text = load_fixture()
print(f"Loaded BGB excerpt: {len(text)} chars")
if args.dry_run:
# Import chunking directly
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "embedding-service"))
from main import chunk_text_legal_structured
chunks = chunk_text_legal_structured(text, 1500, 100)
# Build fake points for verification
points = [{"payload": {
"chunk_index": c["index"],
"chunk_text": c["text"],
"section": c["section"],
"section_title": c["section_title"],
"paragraph": c["paragraph"],
"paragraph_num": c["paragraph_num"],
"page": c["page"],
}} for c in chunks]
report = verify_chunks(points)
print_report(report)
sys.exit(1 if report["issues"] else 0)
# Full integration test
print(f"Uploading to {args.rag_url} → collection={COLLECTION}...")
result = upload_document(args.rag_url, text)
doc_id = result["document_id"]
print(f" document_id: {doc_id}")
print(f" chunks_count: {result['chunks_count']}")
print(f" vectors_indexed: {result['vectors_indexed']}")
print("Waiting 2s for indexing...")
time.sleep(2)
print(f"Scrolling Qdrant at {args.qdrant_url}...")
points = scroll_chunks(args.qdrant_url, doc_id)
print(f" Found {len(points)} points")
report = verify_chunks(points)
print_report(report)
if not args.keep:
print(f"\nCleaning up test data (document_id={doc_id})...")
delete_test_data(args.qdrant_url, doc_id)
print(" Deleted.")
sys.exit(1 if report["issues"] else 0)
if __name__ == "__main__":
main()
+13 -7
View File
@@ -25,8 +25,7 @@ import re
import uuid
from collections import defaultdict
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from typing import Dict, List, Optional, Set
from typing import Dict, List, Optional
import httpx
from pydantic import BaseModel
@@ -34,7 +33,7 @@ from sqlalchemy import text
from sqlalchemy.orm import Session
from .rag_client import ComplianceRAGClient, RAGSearchResult, get_rag_client
from .similarity_detector import check_similarity, SimilarityReport
from .similarity_detector import check_similarity
logger = logging.getLogger(__name__)
@@ -1019,11 +1018,12 @@ class ControlGeneratorPipeline:
regulation_name=reg_name,
regulation_short=reg_short,
category=payload.get("category", "") or payload.get("data_type", ""),
article=payload.get("article", "") or payload.get("section_title", "") or payload.get("section", ""),
article=payload.get("section", "") or payload.get("article", "") or payload.get("section_title", ""),
paragraph=payload.get("paragraph", ""),
source_url=payload.get("source_url", "") or payload.get("source", "") or payload.get("url", ""),
score=0.0,
collection=collection,
page=payload.get("page"),
)
all_results.append(chunk)
collection_new += 1
@@ -1127,6 +1127,7 @@ Quelle: {chunk.regulation_name} ({chunk.regulation_code}), {chunk.article}"""
"source": canonical_source,
"article": effective_article,
"paragraph": effective_paragraph,
"page": chunk.page,
"license": license_info.get("license", ""),
"source_type": license_info.get("source_type", "law"),
"url": chunk.source_url or "",
@@ -1141,6 +1142,7 @@ Quelle: {chunk.regulation_name} ({chunk.regulation_code}), {chunk.article}"""
"source_regulation": chunk.regulation_code,
"source_article": effective_article,
"source_paragraph": effective_paragraph,
"source_page": chunk.page,
}
return control
@@ -1194,6 +1196,7 @@ Quelle: {chunk.regulation_name}, {chunk.article}"""
"source": canonical_source,
"article": effective_article,
"paragraph": effective_paragraph,
"page": chunk.page,
"license": license_info.get("license", ""),
"license_notice": attribution,
"source_type": license_info.get("source_type", "standard"),
@@ -1209,6 +1212,7 @@ Quelle: {chunk.regulation_name}, {chunk.article}"""
"source_regulation": chunk.regulation_code,
"source_article": effective_article,
"source_paragraph": effective_paragraph,
"source_page": chunk.page,
}
return control
@@ -1368,6 +1372,7 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Elementen. Fuer Chunks ohne A
"source": canonical_source,
"article": effective_article,
"paragraph": effective_paragraph,
"page": chunk.page,
"license": lic.get("license", ""),
"license_notice": lic.get("attribution", ""),
"source_type": lic.get("source_type", "law"),
@@ -1384,6 +1389,7 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Elementen. Fuer Chunks ohne A
"source_regulation": chunk.regulation_code,
"source_article": effective_article,
"source_paragraph": effective_paragraph,
"source_page": chunk.page,
"batch_size": len(chunks),
"document_grouped": same_doc,
}
@@ -1479,14 +1485,14 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Elementen. Fuer Aspekte ohne
) -> list[Optional[GeneratedControl]]:
"""Process a batch of (chunk, license_info) through stages 3-5."""
# Split by license rule: Rule 1+2 → structure, Rule 3 → reform
structure_items = [(c, l) for c, l in batch_items if l["rule"] in (1, 2)]
reform_items = [(c, l) for c, l in batch_items if l["rule"] == 3]
structure_items = [(c, lic) for c, lic in batch_items if lic["rule"] in (1, 2)]
reform_items = [(c, lic) for c, lic in batch_items if lic["rule"] == 3]
all_controls: dict[int, Optional[GeneratedControl]] = {}
if structure_items:
s_chunks = [c for c, _ in structure_items]
s_lics = [l for _, l in structure_items]
s_lics = [lic for _, lic in structure_items]
try:
s_controls = await self._structure_batch(s_chunks, s_lics)
except Exception as e:
@@ -24,7 +24,6 @@ import json
import logging
import os
import re
import uuid
from dataclasses import dataclass, field
from typing import Optional
@@ -56,7 +55,7 @@ ANTHROPIC_API_URL = "https://api.anthropic.com/v1"
# Patterns are defined in normative_patterns.py and imported here
# with local aliases for backward compatibility.
from .normative_patterns import (
from .normative_patterns import ( # noqa: E402
PFLICHT_RE as _PFLICHT_RE,
EMPFEHLUNG_RE as _EMPFEHLUNG_RE,
KANN_RE as _KANN_RE,
@@ -3472,7 +3471,7 @@ class DecompositionPass:
"category": atomic.category,
"parent_uuid": parent_uuid,
"gen_meta": json.dumps({
"decomposition_source": candidate_id,
"decomposition_source_id": candidate_id,
"decomposition_method": "pass0b",
"engine_version": "v2",
"action_object_class": getattr(atomic, "domain", ""),
@@ -4104,6 +4103,8 @@ def _format_citation(citation) -> str:
parts.append(c["article"])
if c.get("paragraph"):
parts.append(c["paragraph"])
if c.get("page") is not None:
parts.append(f"S. {c['page']}")
return " ".join(parts) if parts else citation
except (json.JSONDecodeError, TypeError):
return citation
+1
View File
@@ -34,6 +34,7 @@ class RAGSearchResult:
source_url: str
score: float
collection: str = ""
page: Optional[int] = None
class ComplianceRAGClient:
+166
View File
@@ -0,0 +1,166 @@
"""Tests for D3: Structural metadata flow (section priority, page in citation)."""
import json
from typing import Optional
from services.rag_client import RAGSearchResult
def _make_chunk(
article: str = "",
paragraph: str = "",
page: Optional[int] = None,
) -> RAGSearchResult:
return RAGSearchResult(
text="Test chunk text",
regulation_code="DSGVO",
regulation_name="Datenschutz-Grundverordnung",
regulation_short="DSGVO",
category="data_protection",
article=article,
paragraph=paragraph,
source_url="https://example.com",
score=0.95,
collection="bp_compliance_de",
page=page,
)
class TestRAGSearchResultPage:
"""RAGSearchResult now carries a page field."""
def test_page_default_none(self):
chunk = _make_chunk()
assert chunk.page is None
def test_page_set(self):
chunk = _make_chunk(page=42)
assert chunk.page == 42
def test_page_zero(self):
chunk = _make_chunk(page=0)
assert chunk.page == 0
class TestQdrantPayloadPriority:
"""section (D2) should take priority over article (legacy)."""
def test_section_preferred_over_article(self):
payload = {"section": "§ 312k", "article": "Art. 312", "section_title": "Kuendigungsbutton"}
article = payload.get("section", "") or payload.get("article", "") or payload.get("section_title", "")
assert article == "§ 312k"
def test_article_fallback_when_no_section(self):
payload = {"section": "", "article": "Art. 35", "section_title": ""}
article = payload.get("section", "") or payload.get("article", "") or payload.get("section_title", "")
assert article == "Art. 35"
def test_section_title_last_resort(self):
payload = {"section": "", "article": "", "section_title": "Informationspflichten"}
article = payload.get("section", "") or payload.get("article", "") or payload.get("section_title", "")
assert article == "Informationspflichten"
def test_all_empty(self):
payload = {"section": "", "article": "", "section_title": ""}
article = payload.get("section", "") or payload.get("article", "") or payload.get("section_title", "")
assert article == ""
def test_page_from_payload(self):
payload = {"page": 847}
assert payload.get("page") == 847
def test_page_none_from_payload(self):
payload = {}
assert payload.get("page") is None
class TestSourceCitationPage:
"""source_citation dict should include page when available."""
def _build_citation(self, chunk: RAGSearchResult) -> dict:
"""Mirrors the citation-building logic from control_generator.py."""
return {
"source": chunk.regulation_name,
"article": chunk.article,
"paragraph": chunk.paragraph,
"page": chunk.page,
"license": "free_use",
"source_type": "law",
"url": chunk.source_url or "",
}
def test_citation_with_page(self):
chunk = _make_chunk(article="§ 312k", paragraph="Abs. 1", page=847)
citation = self._build_citation(chunk)
assert citation["page"] == 847
def test_citation_without_page(self):
chunk = _make_chunk(article="§ 312k", paragraph="Abs. 1")
citation = self._build_citation(chunk)
assert citation["page"] is None
def test_citation_serializable(self):
chunk = _make_chunk(article="Art. 35", page=12)
citation = self._build_citation(chunk)
serialized = json.dumps(citation)
restored = json.loads(serialized)
assert restored["page"] == 12
class TestFormatCitation:
"""_format_citation should include page number."""
def _format_citation(self, citation) -> str:
"""Mirrors _format_citation from decomposition_pass.py."""
if not citation:
return ""
if isinstance(citation, str):
try:
c = json.loads(citation)
if isinstance(c, dict):
parts = []
if c.get("source"):
parts.append(c["source"])
if c.get("article"):
parts.append(c["article"])
if c.get("paragraph"):
parts.append(c["paragraph"])
if c.get("page") is not None:
parts.append(f"S. {c['page']}")
return " ".join(parts) if parts else citation
except (json.JSONDecodeError, TypeError):
return citation
return str(citation)
def test_format_with_page(self):
citation = json.dumps({
"source": "DSGVO",
"article": "Art. 35",
"paragraph": "Abs. 1",
"page": 42,
})
result = self._format_citation(citation)
assert result == "DSGVO Art. 35 Abs. 1 S. 42"
def test_format_without_page(self):
citation = json.dumps({
"source": "BGB",
"article": "§ 312k",
"paragraph": "",
})
result = self._format_citation(citation)
assert result == "BGB § 312k"
def test_format_page_zero(self):
citation = json.dumps({
"source": "BGB",
"article": "§ 1",
"paragraph": "",
"page": 0,
})
result = self._format_citation(citation)
assert result == "BGB § 1 S. 0"
def test_format_empty_citation(self):
assert self._format_citation("") == ""
assert self._format_citation(None) == ""