Files
breakpilot-lehrer/klausur-service/backend/legal_corpus_api.py
Benjamin Boenisch 5a31f52310 Initial commit: breakpilot-lehrer - Lehrer KI Platform
Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website,
Klausur-Service, School-Service, Voice-Service, Geo-Service,
BreakPilot Drive, Agent-Core

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 23:47:26 +01:00

791 lines
26 KiB
Python

"""
Legal Corpus API - Endpoints for RAG page in admin-v2
Provides endpoints for:
- GET /api/v1/admin/legal-corpus/status - Collection status with chunk counts
- GET /api/v1/admin/legal-corpus/search - Semantic search
- POST /api/v1/admin/legal-corpus/ingest - Trigger ingestion
- GET /api/v1/admin/legal-corpus/ingestion-status - Ingestion status
- POST /api/v1/admin/legal-corpus/upload - Upload document
- POST /api/v1/admin/legal-corpus/add-link - Add link for ingestion
- POST /api/v1/admin/pipeline/start - Start compliance pipeline
"""
import os
import asyncio
import httpx
import uuid
import shutil
from datetime import datetime
from typing import Optional, List, Dict, Any
from fastapi import APIRouter, HTTPException, Query, BackgroundTasks, UploadFile, File, Form
from pydantic import BaseModel
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/admin/legal-corpus", tags=["legal-corpus"])
# Configuration
QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333")
EMBEDDING_SERVICE_URL = os.getenv("EMBEDDING_SERVICE_URL", "http://embedding-service:8087")
COLLECTION_NAME = "bp_legal_corpus"
# All regulations for status endpoint
REGULATIONS = [
{"code": "GDPR", "name": "DSGVO", "fullName": "Datenschutz-Grundverordnung", "type": "eu_regulation"},
{"code": "EPRIVACY", "name": "ePrivacy-Richtlinie", "fullName": "Richtlinie 2002/58/EG", "type": "eu_directive"},
{"code": "TDDDG", "name": "TDDDG", "fullName": "Telekommunikation-Digitale-Dienste-Datenschutz-Gesetz", "type": "de_law"},
{"code": "SCC", "name": "Standardvertragsklauseln", "fullName": "2021/914/EU", "type": "eu_regulation"},
{"code": "DPF", "name": "EU-US Data Privacy Framework", "fullName": "Angemessenheitsbeschluss", "type": "eu_regulation"},
{"code": "AIACT", "name": "EU AI Act", "fullName": "Verordnung (EU) 2024/1689", "type": "eu_regulation"},
{"code": "CRA", "name": "Cyber Resilience Act", "fullName": "Verordnung (EU) 2024/2847", "type": "eu_regulation"},
{"code": "NIS2", "name": "NIS2-Richtlinie", "fullName": "Richtlinie (EU) 2022/2555", "type": "eu_directive"},
{"code": "EUCSA", "name": "EU Cybersecurity Act", "fullName": "Verordnung (EU) 2019/881", "type": "eu_regulation"},
{"code": "DATAACT", "name": "Data Act", "fullName": "Verordnung (EU) 2023/2854", "type": "eu_regulation"},
{"code": "DGA", "name": "Data Governance Act", "fullName": "Verordnung (EU) 2022/868", "type": "eu_regulation"},
{"code": "DSA", "name": "Digital Services Act", "fullName": "Verordnung (EU) 2022/2065", "type": "eu_regulation"},
{"code": "EAA", "name": "European Accessibility Act", "fullName": "Richtlinie (EU) 2019/882", "type": "eu_directive"},
{"code": "DSM", "name": "DSM-Urheberrechtsrichtlinie", "fullName": "Richtlinie (EU) 2019/790", "type": "eu_directive"},
{"code": "PLD", "name": "Produkthaftungsrichtlinie", "fullName": "Richtlinie 85/374/EWG", "type": "eu_directive"},
{"code": "GPSR", "name": "General Product Safety", "fullName": "Verordnung (EU) 2023/988", "type": "eu_regulation"},
{"code": "BSI-TR-03161-1", "name": "BSI-TR Teil 1", "fullName": "BSI TR-03161 Teil 1 - Mobile Anwendungen", "type": "bsi_standard"},
{"code": "BSI-TR-03161-2", "name": "BSI-TR Teil 2", "fullName": "BSI TR-03161 Teil 2 - Web-Anwendungen", "type": "bsi_standard"},
{"code": "BSI-TR-03161-3", "name": "BSI-TR Teil 3", "fullName": "BSI TR-03161 Teil 3 - Hintergrundsysteme", "type": "bsi_standard"},
]
# Ingestion state (in-memory for now)
ingestion_state = {
"running": False,
"completed": False,
"current_regulation": None,
"processed": 0,
"total": len(REGULATIONS),
"error": None,
}
class SearchRequest(BaseModel):
query: str
regulations: Optional[List[str]] = None
top_k: int = 5
class IngestRequest(BaseModel):
force: bool = False
regulations: Optional[List[str]] = None
class AddLinkRequest(BaseModel):
url: str
title: str
code: str # Regulation code (e.g. "CUSTOM-1")
document_type: str = "custom" # custom, eu_regulation, eu_directive, de_law, bsi_standard
class StartPipelineRequest(BaseModel):
force_reindex: bool = False
skip_ingestion: bool = False
# Store for custom documents (in-memory for now, should be persisted)
custom_documents: List[Dict[str, Any]] = []
async def get_qdrant_client():
"""Get async HTTP client for Qdrant."""
return httpx.AsyncClient(timeout=30.0)
@router.get("/status")
async def get_legal_corpus_status():
"""
Get status of the legal corpus collection including chunk counts per regulation.
"""
async with httpx.AsyncClient(timeout=30.0) as client:
try:
# Get collection info
collection_res = await client.get(f"{QDRANT_URL}/collections/{COLLECTION_NAME}")
if collection_res.status_code != 200:
return {
"collection": COLLECTION_NAME,
"totalPoints": 0,
"vectorSize": 1024,
"status": "not_found",
"regulations": {},
}
collection_data = collection_res.json()
result = collection_data.get("result", {})
# Get chunk counts per regulation
regulation_counts = {}
for reg in REGULATIONS:
count_res = await client.post(
f"{QDRANT_URL}/collections/{COLLECTION_NAME}/points/count",
json={
"filter": {
"must": [{"key": "regulation_code", "match": {"value": reg["code"]}}]
}
},
)
if count_res.status_code == 200:
count_data = count_res.json()
regulation_counts[reg["code"]] = count_data.get("result", {}).get("count", 0)
else:
regulation_counts[reg["code"]] = 0
return {
"collection": COLLECTION_NAME,
"totalPoints": result.get("points_count", 0),
"vectorSize": result.get("config", {}).get("params", {}).get("vectors", {}).get("size", 1024),
"status": result.get("status", "unknown"),
"regulations": regulation_counts,
}
except httpx.RequestError as e:
logger.error(f"Failed to get Qdrant status: {e}")
raise HTTPException(status_code=503, detail=f"Qdrant not available: {str(e)}")
@router.get("/search")
async def search_legal_corpus(
query: str = Query(..., description="Search query"),
top_k: int = Query(5, ge=1, le=20, description="Number of results"),
regulations: Optional[str] = Query(None, description="Comma-separated regulation codes to filter"),
):
"""
Semantic search in legal corpus using BGE-M3 embeddings.
"""
async with httpx.AsyncClient(timeout=60.0) as client:
try:
# Generate embedding for query
embed_res = await client.post(
f"{EMBEDDING_SERVICE_URL}/embed",
json={"texts": [query]},
)
if embed_res.status_code != 200:
raise HTTPException(status_code=500, detail="Embedding service error")
embed_data = embed_res.json()
query_vector = embed_data["embeddings"][0]
# Build Qdrant search request
search_request = {
"vector": query_vector,
"limit": top_k,
"with_payload": True,
}
# Add regulation filter if specified
if regulations:
reg_codes = [r.strip() for r in regulations.split(",")]
search_request["filter"] = {
"should": [
{"key": "regulation_code", "match": {"value": code}}
for code in reg_codes
]
}
# Search Qdrant
search_res = await client.post(
f"{QDRANT_URL}/collections/{COLLECTION_NAME}/points/search",
json=search_request,
)
if search_res.status_code != 200:
raise HTTPException(status_code=500, detail="Search failed")
search_data = search_res.json()
results = []
for point in search_data.get("result", []):
payload = point.get("payload", {})
results.append({
"text": payload.get("text", ""),
"regulation_code": payload.get("regulation_code", ""),
"regulation_name": payload.get("regulation_name", ""),
"article": payload.get("article"),
"paragraph": payload.get("paragraph"),
"source_url": payload.get("source_url", ""),
"score": point.get("score", 0),
})
return {"results": results, "query": query, "count": len(results)}
except httpx.RequestError as e:
logger.error(f"Search failed: {e}")
raise HTTPException(status_code=503, detail=f"Service not available: {str(e)}")
@router.post("/ingest")
async def trigger_ingestion(request: IngestRequest, background_tasks: BackgroundTasks):
"""
Trigger legal corpus ingestion in background.
"""
global ingestion_state
if ingestion_state["running"]:
raise HTTPException(status_code=409, detail="Ingestion already running")
# Reset state
ingestion_state = {
"running": True,
"completed": False,
"current_regulation": None,
"processed": 0,
"total": len(REGULATIONS),
"error": None,
}
# Start ingestion in background
background_tasks.add_task(run_ingestion, request.force, request.regulations)
return {
"status": "started",
"job_id": "manual-trigger",
"message": f"Ingestion started for {len(REGULATIONS)} regulations",
}
async def run_ingestion(force: bool, regulations: Optional[List[str]]):
"""Background task for running ingestion."""
global ingestion_state
try:
# Import ingestion module
from legal_corpus_ingestion import LegalCorpusIngestion
ingestion = LegalCorpusIngestion()
# Filter regulations if specified
regs_to_process = regulations or [r["code"] for r in REGULATIONS]
for i, reg_code in enumerate(regs_to_process):
ingestion_state["current_regulation"] = reg_code
ingestion_state["processed"] = i
try:
await ingestion.ingest_single(reg_code, force=force)
except Exception as e:
logger.error(f"Failed to ingest {reg_code}: {e}")
ingestion_state["completed"] = True
ingestion_state["processed"] = len(regs_to_process)
except Exception as e:
logger.error(f"Ingestion failed: {e}")
ingestion_state["error"] = str(e)
finally:
ingestion_state["running"] = False
@router.get("/ingestion-status")
async def get_ingestion_status():
"""
Get current ingestion status.
"""
return ingestion_state
@router.get("/regulations")
async def get_regulations():
"""
Get list of all supported regulations.
"""
return {"regulations": REGULATIONS}
@router.get("/custom-documents")
async def get_custom_documents():
"""
Get list of custom documents added by user.
"""
return {"documents": custom_documents}
@router.post("/upload")
async def upload_document(
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
title: str = Form(...),
code: str = Form(...),
document_type: str = Form("custom"),
):
"""
Upload a document (PDF) for ingestion into the legal corpus.
The document will be saved and queued for processing.
"""
global custom_documents
# Validate file type
if not file.filename.endswith(('.pdf', '.PDF')):
raise HTTPException(status_code=400, detail="Only PDF files are supported")
# Create upload directory if needed
upload_dir = "/tmp/legal_corpus_uploads"
os.makedirs(upload_dir, exist_ok=True)
# Save file with unique name
doc_id = str(uuid.uuid4())[:8]
safe_filename = f"{doc_id}_{file.filename}"
file_path = os.path.join(upload_dir, safe_filename)
try:
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
except Exception as e:
logger.error(f"Failed to save uploaded file: {e}")
raise HTTPException(status_code=500, detail=f"Failed to save file: {str(e)}")
# Create document record
doc_record = {
"id": doc_id,
"code": code,
"title": title,
"filename": file.filename,
"file_path": file_path,
"document_type": document_type,
"uploaded_at": datetime.now().isoformat(),
"status": "uploaded",
"chunk_count": 0,
}
custom_documents.append(doc_record)
# Queue for background ingestion
background_tasks.add_task(ingest_uploaded_document, doc_record)
return {
"status": "uploaded",
"document_id": doc_id,
"message": f"Document '{title}' uploaded and queued for ingestion",
"document": doc_record,
}
async def ingest_uploaded_document(doc_record: Dict[str, Any]):
"""Background task to ingest an uploaded document."""
global custom_documents
try:
doc_record["status"] = "processing"
from legal_corpus_ingestion import LegalCorpusIngestion
ingestion = LegalCorpusIngestion()
# Read PDF and extract text
import fitz # PyMuPDF
doc = fitz.open(doc_record["file_path"])
full_text = ""
for page in doc:
full_text += page.get_text()
doc.close()
if not full_text.strip():
doc_record["status"] = "error"
doc_record["error"] = "No text could be extracted from PDF"
return
# Chunk the text
chunks = ingestion.chunk_text(full_text, doc_record["code"])
# Add metadata
for chunk in chunks:
chunk["regulation_code"] = doc_record["code"]
chunk["regulation_name"] = doc_record["title"]
chunk["document_type"] = doc_record["document_type"]
chunk["source_url"] = f"upload://{doc_record['filename']}"
# Generate embeddings and upsert to Qdrant
if chunks:
await ingestion.embed_and_upsert(chunks)
doc_record["chunk_count"] = len(chunks)
doc_record["status"] = "indexed"
logger.info(f"Ingested {len(chunks)} chunks from uploaded document {doc_record['code']}")
else:
doc_record["status"] = "error"
doc_record["error"] = "No chunks generated from document"
except Exception as e:
logger.error(f"Failed to ingest uploaded document: {e}")
doc_record["status"] = "error"
doc_record["error"] = str(e)
@router.post("/add-link")
async def add_link(request: AddLinkRequest, background_tasks: BackgroundTasks):
"""
Add a URL/link for ingestion into the legal corpus.
The content will be fetched, extracted, and indexed.
"""
global custom_documents
# Create document record
doc_id = str(uuid.uuid4())[:8]
doc_record = {
"id": doc_id,
"code": request.code,
"title": request.title,
"url": request.url,
"document_type": request.document_type,
"uploaded_at": datetime.now().isoformat(),
"status": "queued",
"chunk_count": 0,
}
custom_documents.append(doc_record)
# Queue for background ingestion
background_tasks.add_task(ingest_link_document, doc_record)
return {
"status": "queued",
"document_id": doc_id,
"message": f"Link '{request.title}' queued for ingestion",
"document": doc_record,
}
async def ingest_link_document(doc_record: Dict[str, Any]):
"""Background task to ingest content from a URL."""
global custom_documents
try:
doc_record["status"] = "fetching"
async with httpx.AsyncClient(timeout=60.0) as client:
# Fetch the URL
response = await client.get(doc_record["url"], follow_redirects=True)
response.raise_for_status()
content_type = response.headers.get("content-type", "")
if "application/pdf" in content_type:
# Save PDF and process
import tempfile
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as f:
f.write(response.content)
pdf_path = f.name
import fitz
pdf_doc = fitz.open(pdf_path)
full_text = ""
for page in pdf_doc:
full_text += page.get_text()
pdf_doc.close()
os.unlink(pdf_path)
elif "text/html" in content_type:
# Extract text from HTML
from bs4 import BeautifulSoup
soup = BeautifulSoup(response.text, "html.parser")
# Remove script and style elements
for script in soup(["script", "style", "nav", "footer", "header"]):
script.decompose()
full_text = soup.get_text(separator="\n", strip=True)
else:
# Try to use as plain text
full_text = response.text
if not full_text.strip():
doc_record["status"] = "error"
doc_record["error"] = "No text could be extracted from URL"
return
doc_record["status"] = "processing"
from legal_corpus_ingestion import LegalCorpusIngestion
ingestion = LegalCorpusIngestion()
# Chunk the text
chunks = ingestion.chunk_text(full_text, doc_record["code"])
# Add metadata
for chunk in chunks:
chunk["regulation_code"] = doc_record["code"]
chunk["regulation_name"] = doc_record["title"]
chunk["document_type"] = doc_record["document_type"]
chunk["source_url"] = doc_record["url"]
# Generate embeddings and upsert to Qdrant
if chunks:
await ingestion.embed_and_upsert(chunks)
doc_record["chunk_count"] = len(chunks)
doc_record["status"] = "indexed"
logger.info(f"Ingested {len(chunks)} chunks from URL {doc_record['url']}")
else:
doc_record["status"] = "error"
doc_record["error"] = "No chunks generated from content"
except httpx.HTTPError as e:
logger.error(f"Failed to fetch URL: {e}")
doc_record["status"] = "error"
doc_record["error"] = f"Failed to fetch URL: {str(e)}"
except Exception as e:
logger.error(f"Failed to ingest URL content: {e}")
doc_record["status"] = "error"
doc_record["error"] = str(e)
@router.delete("/custom-documents/{doc_id}")
async def delete_custom_document(doc_id: str):
"""
Delete a custom document from the list.
Note: This does not remove the chunks from Qdrant yet.
"""
global custom_documents
doc = next((d for d in custom_documents if d["id"] == doc_id), None)
if not doc:
raise HTTPException(status_code=404, detail="Document not found")
custom_documents = [d for d in custom_documents if d["id"] != doc_id]
# TODO: Also remove chunks from Qdrant by filtering on code
return {"status": "deleted", "document_id": doc_id}
# ========== Pipeline Checkpoints ==========
# Create a separate router for pipeline-related endpoints
pipeline_router = APIRouter(prefix="/api/v1/admin/pipeline", tags=["pipeline"])
@pipeline_router.get("/checkpoints")
async def get_pipeline_checkpoints():
"""
Get current pipeline checkpoint state.
Returns the current state of the compliance pipeline including:
- Pipeline ID and overall status
- Start and completion times
- All checkpoints with their validations and metrics
- Summary data
"""
from pipeline_checkpoints import CheckpointManager
state = CheckpointManager.load_state()
if state is None:
return {
"status": "no_data",
"message": "No pipeline run data available yet.",
"pipeline_id": None,
"checkpoints": [],
"summary": {}
}
# Enrich with validation summary
validation_summary = {
"passed": 0,
"warning": 0,
"failed": 0,
"total": 0
}
for checkpoint in state.get("checkpoints", []):
for validation in checkpoint.get("validations", []):
validation_summary["total"] += 1
status = validation.get("status", "not_run")
if status in validation_summary:
validation_summary[status] += 1
state["validation_summary"] = validation_summary
return state
@pipeline_router.get("/checkpoints/history")
async def get_pipeline_history():
"""
Get list of previous pipeline runs (if stored).
For now, returns only current run.
"""
from pipeline_checkpoints import CheckpointManager
state = CheckpointManager.load_state()
if state is None:
return {"runs": []}
return {
"runs": [{
"pipeline_id": state.get("pipeline_id"),
"status": state.get("status"),
"started_at": state.get("started_at"),
"completed_at": state.get("completed_at"),
}]
}
# Pipeline state for start/stop
pipeline_process_state = {
"running": False,
"pid": None,
"started_at": None,
}
@pipeline_router.post("/start")
async def start_pipeline(request: StartPipelineRequest, background_tasks: BackgroundTasks):
"""
Start the compliance pipeline in the background.
This runs the full_compliance_pipeline.py script which:
1. Ingests all legal documents (unless skip_ingestion=True)
2. Extracts requirements and controls
3. Generates compliance measures
4. Creates checkpoint data for monitoring
"""
global pipeline_process_state
# Check if already running
from pipeline_checkpoints import CheckpointManager
state = CheckpointManager.load_state()
if state and state.get("status") == "running":
raise HTTPException(
status_code=409,
detail="Pipeline is already running"
)
if pipeline_process_state["running"]:
raise HTTPException(
status_code=409,
detail="Pipeline start already in progress"
)
pipeline_process_state["running"] = True
pipeline_process_state["started_at"] = datetime.now().isoformat()
# Start pipeline in background
background_tasks.add_task(
run_pipeline_background,
request.force_reindex,
request.skip_ingestion
)
return {
"status": "starting",
"message": "Compliance pipeline is starting in background",
"started_at": pipeline_process_state["started_at"],
}
async def run_pipeline_background(force_reindex: bool, skip_ingestion: bool):
"""Background task to run the compliance pipeline."""
global pipeline_process_state
try:
import subprocess
import sys
# Build command
cmd = [sys.executable, "full_compliance_pipeline.py"]
if force_reindex:
cmd.append("--force-reindex")
if skip_ingestion:
cmd.append("--skip-ingestion")
# Run as subprocess
logger.info(f"Starting pipeline: {' '.join(cmd)}")
process = subprocess.Popen(
cmd,
cwd=os.path.dirname(os.path.abspath(__file__)),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
pipeline_process_state["pid"] = process.pid
# Wait for completion (non-blocking via asyncio)
import asyncio
while process.poll() is None:
await asyncio.sleep(5)
return_code = process.returncode
if return_code != 0:
output = process.stdout.read() if process.stdout else ""
logger.error(f"Pipeline failed with code {return_code}: {output}")
else:
logger.info("Pipeline completed successfully")
except Exception as e:
logger.error(f"Failed to run pipeline: {e}")
finally:
pipeline_process_state["running"] = False
pipeline_process_state["pid"] = None
@pipeline_router.get("/status")
async def get_pipeline_status():
"""
Get current pipeline running status.
"""
from pipeline_checkpoints import CheckpointManager
state = CheckpointManager.load_state()
checkpoint_status = state.get("status") if state else "no_data"
return {
"process_running": pipeline_process_state["running"],
"process_pid": pipeline_process_state["pid"],
"process_started_at": pipeline_process_state["started_at"],
"checkpoint_status": checkpoint_status,
"current_phase": state.get("current_phase") if state else None,
}
# ========== Traceability / Quality Endpoints ==========
@router.get("/traceability")
async def get_traceability(
chunk_id: str = Query(..., description="Chunk ID or identifier"),
regulation: str = Query(..., description="Regulation code"),
):
"""
Get traceability information for a specific chunk.
Returns:
- The chunk details
- Requirements extracted from this chunk
- Controls derived from those requirements
Note: This is a placeholder that will be enhanced once the
requirements extraction pipeline is fully implemented.
"""
async with httpx.AsyncClient(timeout=30.0) as client:
try:
# Try to find the chunk by scrolling through points with the regulation filter
# In a production system, we would have proper IDs and indexing
# For now, return placeholder structure
# The actual implementation will query:
# 1. The chunk from Qdrant
# 2. Requirements from a requirements collection/table
# 3. Controls from a controls collection/table
return {
"chunk_id": chunk_id,
"regulation": regulation,
"requirements": [],
"controls": [],
"message": "Traceability-Daten werden verfuegbar sein, sobald die Requirements-Extraktion und Control-Ableitung implementiert sind."
}
except Exception as e:
logger.error(f"Failed to get traceability: {e}")
raise HTTPException(status_code=500, detail=f"Traceability lookup failed: {str(e)}")