Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website, Klausur-Service, School-Service, Voice-Service, Geo-Service, BreakPilot Drive, Agent-Core Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
791 lines
26 KiB
Python
791 lines
26 KiB
Python
"""
|
|
Legal Corpus API - Endpoints for RAG page in admin-v2
|
|
|
|
Provides endpoints for:
|
|
- GET /api/v1/admin/legal-corpus/status - Collection status with chunk counts
|
|
- GET /api/v1/admin/legal-corpus/search - Semantic search
|
|
- POST /api/v1/admin/legal-corpus/ingest - Trigger ingestion
|
|
- GET /api/v1/admin/legal-corpus/ingestion-status - Ingestion status
|
|
- POST /api/v1/admin/legal-corpus/upload - Upload document
|
|
- POST /api/v1/admin/legal-corpus/add-link - Add link for ingestion
|
|
- POST /api/v1/admin/pipeline/start - Start compliance pipeline
|
|
"""
|
|
|
|
import os
|
|
import asyncio
|
|
import httpx
|
|
import uuid
|
|
import shutil
|
|
from datetime import datetime
|
|
from typing import Optional, List, Dict, Any
|
|
from fastapi import APIRouter, HTTPException, Query, BackgroundTasks, UploadFile, File, Form
|
|
from pydantic import BaseModel
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/api/v1/admin/legal-corpus", tags=["legal-corpus"])
|
|
|
|
# Configuration
|
|
QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333")
|
|
EMBEDDING_SERVICE_URL = os.getenv("EMBEDDING_SERVICE_URL", "http://embedding-service:8087")
|
|
COLLECTION_NAME = "bp_legal_corpus"
|
|
|
|
# All regulations for status endpoint
|
|
REGULATIONS = [
|
|
{"code": "GDPR", "name": "DSGVO", "fullName": "Datenschutz-Grundverordnung", "type": "eu_regulation"},
|
|
{"code": "EPRIVACY", "name": "ePrivacy-Richtlinie", "fullName": "Richtlinie 2002/58/EG", "type": "eu_directive"},
|
|
{"code": "TDDDG", "name": "TDDDG", "fullName": "Telekommunikation-Digitale-Dienste-Datenschutz-Gesetz", "type": "de_law"},
|
|
{"code": "SCC", "name": "Standardvertragsklauseln", "fullName": "2021/914/EU", "type": "eu_regulation"},
|
|
{"code": "DPF", "name": "EU-US Data Privacy Framework", "fullName": "Angemessenheitsbeschluss", "type": "eu_regulation"},
|
|
{"code": "AIACT", "name": "EU AI Act", "fullName": "Verordnung (EU) 2024/1689", "type": "eu_regulation"},
|
|
{"code": "CRA", "name": "Cyber Resilience Act", "fullName": "Verordnung (EU) 2024/2847", "type": "eu_regulation"},
|
|
{"code": "NIS2", "name": "NIS2-Richtlinie", "fullName": "Richtlinie (EU) 2022/2555", "type": "eu_directive"},
|
|
{"code": "EUCSA", "name": "EU Cybersecurity Act", "fullName": "Verordnung (EU) 2019/881", "type": "eu_regulation"},
|
|
{"code": "DATAACT", "name": "Data Act", "fullName": "Verordnung (EU) 2023/2854", "type": "eu_regulation"},
|
|
{"code": "DGA", "name": "Data Governance Act", "fullName": "Verordnung (EU) 2022/868", "type": "eu_regulation"},
|
|
{"code": "DSA", "name": "Digital Services Act", "fullName": "Verordnung (EU) 2022/2065", "type": "eu_regulation"},
|
|
{"code": "EAA", "name": "European Accessibility Act", "fullName": "Richtlinie (EU) 2019/882", "type": "eu_directive"},
|
|
{"code": "DSM", "name": "DSM-Urheberrechtsrichtlinie", "fullName": "Richtlinie (EU) 2019/790", "type": "eu_directive"},
|
|
{"code": "PLD", "name": "Produkthaftungsrichtlinie", "fullName": "Richtlinie 85/374/EWG", "type": "eu_directive"},
|
|
{"code": "GPSR", "name": "General Product Safety", "fullName": "Verordnung (EU) 2023/988", "type": "eu_regulation"},
|
|
{"code": "BSI-TR-03161-1", "name": "BSI-TR Teil 1", "fullName": "BSI TR-03161 Teil 1 - Mobile Anwendungen", "type": "bsi_standard"},
|
|
{"code": "BSI-TR-03161-2", "name": "BSI-TR Teil 2", "fullName": "BSI TR-03161 Teil 2 - Web-Anwendungen", "type": "bsi_standard"},
|
|
{"code": "BSI-TR-03161-3", "name": "BSI-TR Teil 3", "fullName": "BSI TR-03161 Teil 3 - Hintergrundsysteme", "type": "bsi_standard"},
|
|
]
|
|
|
|
# Ingestion state (in-memory for now)
|
|
ingestion_state = {
|
|
"running": False,
|
|
"completed": False,
|
|
"current_regulation": None,
|
|
"processed": 0,
|
|
"total": len(REGULATIONS),
|
|
"error": None,
|
|
}
|
|
|
|
|
|
class SearchRequest(BaseModel):
|
|
query: str
|
|
regulations: Optional[List[str]] = None
|
|
top_k: int = 5
|
|
|
|
|
|
class IngestRequest(BaseModel):
|
|
force: bool = False
|
|
regulations: Optional[List[str]] = None
|
|
|
|
|
|
class AddLinkRequest(BaseModel):
|
|
url: str
|
|
title: str
|
|
code: str # Regulation code (e.g. "CUSTOM-1")
|
|
document_type: str = "custom" # custom, eu_regulation, eu_directive, de_law, bsi_standard
|
|
|
|
|
|
class StartPipelineRequest(BaseModel):
|
|
force_reindex: bool = False
|
|
skip_ingestion: bool = False
|
|
|
|
|
|
# Store for custom documents (in-memory for now, should be persisted)
|
|
custom_documents: List[Dict[str, Any]] = []
|
|
|
|
|
|
async def get_qdrant_client():
|
|
"""Get async HTTP client for Qdrant."""
|
|
return httpx.AsyncClient(timeout=30.0)
|
|
|
|
|
|
@router.get("/status")
|
|
async def get_legal_corpus_status():
|
|
"""
|
|
Get status of the legal corpus collection including chunk counts per regulation.
|
|
"""
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
try:
|
|
# Get collection info
|
|
collection_res = await client.get(f"{QDRANT_URL}/collections/{COLLECTION_NAME}")
|
|
if collection_res.status_code != 200:
|
|
return {
|
|
"collection": COLLECTION_NAME,
|
|
"totalPoints": 0,
|
|
"vectorSize": 1024,
|
|
"status": "not_found",
|
|
"regulations": {},
|
|
}
|
|
|
|
collection_data = collection_res.json()
|
|
result = collection_data.get("result", {})
|
|
|
|
# Get chunk counts per regulation
|
|
regulation_counts = {}
|
|
for reg in REGULATIONS:
|
|
count_res = await client.post(
|
|
f"{QDRANT_URL}/collections/{COLLECTION_NAME}/points/count",
|
|
json={
|
|
"filter": {
|
|
"must": [{"key": "regulation_code", "match": {"value": reg["code"]}}]
|
|
}
|
|
},
|
|
)
|
|
if count_res.status_code == 200:
|
|
count_data = count_res.json()
|
|
regulation_counts[reg["code"]] = count_data.get("result", {}).get("count", 0)
|
|
else:
|
|
regulation_counts[reg["code"]] = 0
|
|
|
|
return {
|
|
"collection": COLLECTION_NAME,
|
|
"totalPoints": result.get("points_count", 0),
|
|
"vectorSize": result.get("config", {}).get("params", {}).get("vectors", {}).get("size", 1024),
|
|
"status": result.get("status", "unknown"),
|
|
"regulations": regulation_counts,
|
|
}
|
|
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Failed to get Qdrant status: {e}")
|
|
raise HTTPException(status_code=503, detail=f"Qdrant not available: {str(e)}")
|
|
|
|
|
|
@router.get("/search")
|
|
async def search_legal_corpus(
|
|
query: str = Query(..., description="Search query"),
|
|
top_k: int = Query(5, ge=1, le=20, description="Number of results"),
|
|
regulations: Optional[str] = Query(None, description="Comma-separated regulation codes to filter"),
|
|
):
|
|
"""
|
|
Semantic search in legal corpus using BGE-M3 embeddings.
|
|
"""
|
|
async with httpx.AsyncClient(timeout=60.0) as client:
|
|
try:
|
|
# Generate embedding for query
|
|
embed_res = await client.post(
|
|
f"{EMBEDDING_SERVICE_URL}/embed",
|
|
json={"texts": [query]},
|
|
)
|
|
if embed_res.status_code != 200:
|
|
raise HTTPException(status_code=500, detail="Embedding service error")
|
|
|
|
embed_data = embed_res.json()
|
|
query_vector = embed_data["embeddings"][0]
|
|
|
|
# Build Qdrant search request
|
|
search_request = {
|
|
"vector": query_vector,
|
|
"limit": top_k,
|
|
"with_payload": True,
|
|
}
|
|
|
|
# Add regulation filter if specified
|
|
if regulations:
|
|
reg_codes = [r.strip() for r in regulations.split(",")]
|
|
search_request["filter"] = {
|
|
"should": [
|
|
{"key": "regulation_code", "match": {"value": code}}
|
|
for code in reg_codes
|
|
]
|
|
}
|
|
|
|
# Search Qdrant
|
|
search_res = await client.post(
|
|
f"{QDRANT_URL}/collections/{COLLECTION_NAME}/points/search",
|
|
json=search_request,
|
|
)
|
|
|
|
if search_res.status_code != 200:
|
|
raise HTTPException(status_code=500, detail="Search failed")
|
|
|
|
search_data = search_res.json()
|
|
results = []
|
|
for point in search_data.get("result", []):
|
|
payload = point.get("payload", {})
|
|
results.append({
|
|
"text": payload.get("text", ""),
|
|
"regulation_code": payload.get("regulation_code", ""),
|
|
"regulation_name": payload.get("regulation_name", ""),
|
|
"article": payload.get("article"),
|
|
"paragraph": payload.get("paragraph"),
|
|
"source_url": payload.get("source_url", ""),
|
|
"score": point.get("score", 0),
|
|
})
|
|
|
|
return {"results": results, "query": query, "count": len(results)}
|
|
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Search failed: {e}")
|
|
raise HTTPException(status_code=503, detail=f"Service not available: {str(e)}")
|
|
|
|
|
|
@router.post("/ingest")
|
|
async def trigger_ingestion(request: IngestRequest, background_tasks: BackgroundTasks):
|
|
"""
|
|
Trigger legal corpus ingestion in background.
|
|
"""
|
|
global ingestion_state
|
|
|
|
if ingestion_state["running"]:
|
|
raise HTTPException(status_code=409, detail="Ingestion already running")
|
|
|
|
# Reset state
|
|
ingestion_state = {
|
|
"running": True,
|
|
"completed": False,
|
|
"current_regulation": None,
|
|
"processed": 0,
|
|
"total": len(REGULATIONS),
|
|
"error": None,
|
|
}
|
|
|
|
# Start ingestion in background
|
|
background_tasks.add_task(run_ingestion, request.force, request.regulations)
|
|
|
|
return {
|
|
"status": "started",
|
|
"job_id": "manual-trigger",
|
|
"message": f"Ingestion started for {len(REGULATIONS)} regulations",
|
|
}
|
|
|
|
|
|
async def run_ingestion(force: bool, regulations: Optional[List[str]]):
|
|
"""Background task for running ingestion."""
|
|
global ingestion_state
|
|
|
|
try:
|
|
# Import ingestion module
|
|
from legal_corpus_ingestion import LegalCorpusIngestion
|
|
|
|
ingestion = LegalCorpusIngestion()
|
|
|
|
# Filter regulations if specified
|
|
regs_to_process = regulations or [r["code"] for r in REGULATIONS]
|
|
|
|
for i, reg_code in enumerate(regs_to_process):
|
|
ingestion_state["current_regulation"] = reg_code
|
|
ingestion_state["processed"] = i
|
|
|
|
try:
|
|
await ingestion.ingest_single(reg_code, force=force)
|
|
except Exception as e:
|
|
logger.error(f"Failed to ingest {reg_code}: {e}")
|
|
|
|
ingestion_state["completed"] = True
|
|
ingestion_state["processed"] = len(regs_to_process)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Ingestion failed: {e}")
|
|
ingestion_state["error"] = str(e)
|
|
|
|
finally:
|
|
ingestion_state["running"] = False
|
|
|
|
|
|
@router.get("/ingestion-status")
|
|
async def get_ingestion_status():
|
|
"""
|
|
Get current ingestion status.
|
|
"""
|
|
return ingestion_state
|
|
|
|
|
|
@router.get("/regulations")
|
|
async def get_regulations():
|
|
"""
|
|
Get list of all supported regulations.
|
|
"""
|
|
return {"regulations": REGULATIONS}
|
|
|
|
|
|
@router.get("/custom-documents")
|
|
async def get_custom_documents():
|
|
"""
|
|
Get list of custom documents added by user.
|
|
"""
|
|
return {"documents": custom_documents}
|
|
|
|
|
|
@router.post("/upload")
|
|
async def upload_document(
|
|
background_tasks: BackgroundTasks,
|
|
file: UploadFile = File(...),
|
|
title: str = Form(...),
|
|
code: str = Form(...),
|
|
document_type: str = Form("custom"),
|
|
):
|
|
"""
|
|
Upload a document (PDF) for ingestion into the legal corpus.
|
|
|
|
The document will be saved and queued for processing.
|
|
"""
|
|
global custom_documents
|
|
|
|
# Validate file type
|
|
if not file.filename.endswith(('.pdf', '.PDF')):
|
|
raise HTTPException(status_code=400, detail="Only PDF files are supported")
|
|
|
|
# Create upload directory if needed
|
|
upload_dir = "/tmp/legal_corpus_uploads"
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|
|
|
# Save file with unique name
|
|
doc_id = str(uuid.uuid4())[:8]
|
|
safe_filename = f"{doc_id}_{file.filename}"
|
|
file_path = os.path.join(upload_dir, safe_filename)
|
|
|
|
try:
|
|
with open(file_path, "wb") as buffer:
|
|
shutil.copyfileobj(file.file, buffer)
|
|
except Exception as e:
|
|
logger.error(f"Failed to save uploaded file: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to save file: {str(e)}")
|
|
|
|
# Create document record
|
|
doc_record = {
|
|
"id": doc_id,
|
|
"code": code,
|
|
"title": title,
|
|
"filename": file.filename,
|
|
"file_path": file_path,
|
|
"document_type": document_type,
|
|
"uploaded_at": datetime.now().isoformat(),
|
|
"status": "uploaded",
|
|
"chunk_count": 0,
|
|
}
|
|
|
|
custom_documents.append(doc_record)
|
|
|
|
# Queue for background ingestion
|
|
background_tasks.add_task(ingest_uploaded_document, doc_record)
|
|
|
|
return {
|
|
"status": "uploaded",
|
|
"document_id": doc_id,
|
|
"message": f"Document '{title}' uploaded and queued for ingestion",
|
|
"document": doc_record,
|
|
}
|
|
|
|
|
|
async def ingest_uploaded_document(doc_record: Dict[str, Any]):
|
|
"""Background task to ingest an uploaded document."""
|
|
global custom_documents
|
|
|
|
try:
|
|
doc_record["status"] = "processing"
|
|
|
|
from legal_corpus_ingestion import LegalCorpusIngestion
|
|
ingestion = LegalCorpusIngestion()
|
|
|
|
# Read PDF and extract text
|
|
import fitz # PyMuPDF
|
|
|
|
doc = fitz.open(doc_record["file_path"])
|
|
full_text = ""
|
|
for page in doc:
|
|
full_text += page.get_text()
|
|
doc.close()
|
|
|
|
if not full_text.strip():
|
|
doc_record["status"] = "error"
|
|
doc_record["error"] = "No text could be extracted from PDF"
|
|
return
|
|
|
|
# Chunk the text
|
|
chunks = ingestion.chunk_text(full_text, doc_record["code"])
|
|
|
|
# Add metadata
|
|
for chunk in chunks:
|
|
chunk["regulation_code"] = doc_record["code"]
|
|
chunk["regulation_name"] = doc_record["title"]
|
|
chunk["document_type"] = doc_record["document_type"]
|
|
chunk["source_url"] = f"upload://{doc_record['filename']}"
|
|
|
|
# Generate embeddings and upsert to Qdrant
|
|
if chunks:
|
|
await ingestion.embed_and_upsert(chunks)
|
|
doc_record["chunk_count"] = len(chunks)
|
|
doc_record["status"] = "indexed"
|
|
logger.info(f"Ingested {len(chunks)} chunks from uploaded document {doc_record['code']}")
|
|
else:
|
|
doc_record["status"] = "error"
|
|
doc_record["error"] = "No chunks generated from document"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to ingest uploaded document: {e}")
|
|
doc_record["status"] = "error"
|
|
doc_record["error"] = str(e)
|
|
|
|
|
|
@router.post("/add-link")
|
|
async def add_link(request: AddLinkRequest, background_tasks: BackgroundTasks):
|
|
"""
|
|
Add a URL/link for ingestion into the legal corpus.
|
|
|
|
The content will be fetched, extracted, and indexed.
|
|
"""
|
|
global custom_documents
|
|
|
|
# Create document record
|
|
doc_id = str(uuid.uuid4())[:8]
|
|
doc_record = {
|
|
"id": doc_id,
|
|
"code": request.code,
|
|
"title": request.title,
|
|
"url": request.url,
|
|
"document_type": request.document_type,
|
|
"uploaded_at": datetime.now().isoformat(),
|
|
"status": "queued",
|
|
"chunk_count": 0,
|
|
}
|
|
|
|
custom_documents.append(doc_record)
|
|
|
|
# Queue for background ingestion
|
|
background_tasks.add_task(ingest_link_document, doc_record)
|
|
|
|
return {
|
|
"status": "queued",
|
|
"document_id": doc_id,
|
|
"message": f"Link '{request.title}' queued for ingestion",
|
|
"document": doc_record,
|
|
}
|
|
|
|
|
|
async def ingest_link_document(doc_record: Dict[str, Any]):
|
|
"""Background task to ingest content from a URL."""
|
|
global custom_documents
|
|
|
|
try:
|
|
doc_record["status"] = "fetching"
|
|
|
|
async with httpx.AsyncClient(timeout=60.0) as client:
|
|
# Fetch the URL
|
|
response = await client.get(doc_record["url"], follow_redirects=True)
|
|
response.raise_for_status()
|
|
|
|
content_type = response.headers.get("content-type", "")
|
|
|
|
if "application/pdf" in content_type:
|
|
# Save PDF and process
|
|
import tempfile
|
|
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as f:
|
|
f.write(response.content)
|
|
pdf_path = f.name
|
|
|
|
import fitz
|
|
pdf_doc = fitz.open(pdf_path)
|
|
full_text = ""
|
|
for page in pdf_doc:
|
|
full_text += page.get_text()
|
|
pdf_doc.close()
|
|
os.unlink(pdf_path)
|
|
|
|
elif "text/html" in content_type:
|
|
# Extract text from HTML
|
|
from bs4 import BeautifulSoup
|
|
soup = BeautifulSoup(response.text, "html.parser")
|
|
|
|
# Remove script and style elements
|
|
for script in soup(["script", "style", "nav", "footer", "header"]):
|
|
script.decompose()
|
|
|
|
full_text = soup.get_text(separator="\n", strip=True)
|
|
|
|
else:
|
|
# Try to use as plain text
|
|
full_text = response.text
|
|
|
|
if not full_text.strip():
|
|
doc_record["status"] = "error"
|
|
doc_record["error"] = "No text could be extracted from URL"
|
|
return
|
|
|
|
doc_record["status"] = "processing"
|
|
|
|
from legal_corpus_ingestion import LegalCorpusIngestion
|
|
ingestion = LegalCorpusIngestion()
|
|
|
|
# Chunk the text
|
|
chunks = ingestion.chunk_text(full_text, doc_record["code"])
|
|
|
|
# Add metadata
|
|
for chunk in chunks:
|
|
chunk["regulation_code"] = doc_record["code"]
|
|
chunk["regulation_name"] = doc_record["title"]
|
|
chunk["document_type"] = doc_record["document_type"]
|
|
chunk["source_url"] = doc_record["url"]
|
|
|
|
# Generate embeddings and upsert to Qdrant
|
|
if chunks:
|
|
await ingestion.embed_and_upsert(chunks)
|
|
doc_record["chunk_count"] = len(chunks)
|
|
doc_record["status"] = "indexed"
|
|
logger.info(f"Ingested {len(chunks)} chunks from URL {doc_record['url']}")
|
|
else:
|
|
doc_record["status"] = "error"
|
|
doc_record["error"] = "No chunks generated from content"
|
|
|
|
except httpx.HTTPError as e:
|
|
logger.error(f"Failed to fetch URL: {e}")
|
|
doc_record["status"] = "error"
|
|
doc_record["error"] = f"Failed to fetch URL: {str(e)}"
|
|
except Exception as e:
|
|
logger.error(f"Failed to ingest URL content: {e}")
|
|
doc_record["status"] = "error"
|
|
doc_record["error"] = str(e)
|
|
|
|
|
|
@router.delete("/custom-documents/{doc_id}")
|
|
async def delete_custom_document(doc_id: str):
|
|
"""
|
|
Delete a custom document from the list.
|
|
Note: This does not remove the chunks from Qdrant yet.
|
|
"""
|
|
global custom_documents
|
|
|
|
doc = next((d for d in custom_documents if d["id"] == doc_id), None)
|
|
if not doc:
|
|
raise HTTPException(status_code=404, detail="Document not found")
|
|
|
|
custom_documents = [d for d in custom_documents if d["id"] != doc_id]
|
|
|
|
# TODO: Also remove chunks from Qdrant by filtering on code
|
|
|
|
return {"status": "deleted", "document_id": doc_id}
|
|
|
|
|
|
# ========== Pipeline Checkpoints ==========
|
|
|
|
# Create a separate router for pipeline-related endpoints
|
|
pipeline_router = APIRouter(prefix="/api/v1/admin/pipeline", tags=["pipeline"])
|
|
|
|
|
|
@pipeline_router.get("/checkpoints")
|
|
async def get_pipeline_checkpoints():
|
|
"""
|
|
Get current pipeline checkpoint state.
|
|
|
|
Returns the current state of the compliance pipeline including:
|
|
- Pipeline ID and overall status
|
|
- Start and completion times
|
|
- All checkpoints with their validations and metrics
|
|
- Summary data
|
|
"""
|
|
from pipeline_checkpoints import CheckpointManager
|
|
|
|
state = CheckpointManager.load_state()
|
|
|
|
if state is None:
|
|
return {
|
|
"status": "no_data",
|
|
"message": "No pipeline run data available yet.",
|
|
"pipeline_id": None,
|
|
"checkpoints": [],
|
|
"summary": {}
|
|
}
|
|
|
|
# Enrich with validation summary
|
|
validation_summary = {
|
|
"passed": 0,
|
|
"warning": 0,
|
|
"failed": 0,
|
|
"total": 0
|
|
}
|
|
|
|
for checkpoint in state.get("checkpoints", []):
|
|
for validation in checkpoint.get("validations", []):
|
|
validation_summary["total"] += 1
|
|
status = validation.get("status", "not_run")
|
|
if status in validation_summary:
|
|
validation_summary[status] += 1
|
|
|
|
state["validation_summary"] = validation_summary
|
|
|
|
return state
|
|
|
|
|
|
@pipeline_router.get("/checkpoints/history")
|
|
async def get_pipeline_history():
|
|
"""
|
|
Get list of previous pipeline runs (if stored).
|
|
For now, returns only current run.
|
|
"""
|
|
from pipeline_checkpoints import CheckpointManager
|
|
|
|
state = CheckpointManager.load_state()
|
|
|
|
if state is None:
|
|
return {"runs": []}
|
|
|
|
return {
|
|
"runs": [{
|
|
"pipeline_id": state.get("pipeline_id"),
|
|
"status": state.get("status"),
|
|
"started_at": state.get("started_at"),
|
|
"completed_at": state.get("completed_at"),
|
|
}]
|
|
}
|
|
|
|
|
|
# Pipeline state for start/stop
|
|
pipeline_process_state = {
|
|
"running": False,
|
|
"pid": None,
|
|
"started_at": None,
|
|
}
|
|
|
|
|
|
@pipeline_router.post("/start")
|
|
async def start_pipeline(request: StartPipelineRequest, background_tasks: BackgroundTasks):
|
|
"""
|
|
Start the compliance pipeline in the background.
|
|
|
|
This runs the full_compliance_pipeline.py script which:
|
|
1. Ingests all legal documents (unless skip_ingestion=True)
|
|
2. Extracts requirements and controls
|
|
3. Generates compliance measures
|
|
4. Creates checkpoint data for monitoring
|
|
"""
|
|
global pipeline_process_state
|
|
|
|
# Check if already running
|
|
from pipeline_checkpoints import CheckpointManager
|
|
state = CheckpointManager.load_state()
|
|
|
|
if state and state.get("status") == "running":
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail="Pipeline is already running"
|
|
)
|
|
|
|
if pipeline_process_state["running"]:
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail="Pipeline start already in progress"
|
|
)
|
|
|
|
pipeline_process_state["running"] = True
|
|
pipeline_process_state["started_at"] = datetime.now().isoformat()
|
|
|
|
# Start pipeline in background
|
|
background_tasks.add_task(
|
|
run_pipeline_background,
|
|
request.force_reindex,
|
|
request.skip_ingestion
|
|
)
|
|
|
|
return {
|
|
"status": "starting",
|
|
"message": "Compliance pipeline is starting in background",
|
|
"started_at": pipeline_process_state["started_at"],
|
|
}
|
|
|
|
|
|
async def run_pipeline_background(force_reindex: bool, skip_ingestion: bool):
|
|
"""Background task to run the compliance pipeline."""
|
|
global pipeline_process_state
|
|
|
|
try:
|
|
import subprocess
|
|
import sys
|
|
|
|
# Build command
|
|
cmd = [sys.executable, "full_compliance_pipeline.py"]
|
|
if force_reindex:
|
|
cmd.append("--force-reindex")
|
|
if skip_ingestion:
|
|
cmd.append("--skip-ingestion")
|
|
|
|
# Run as subprocess
|
|
logger.info(f"Starting pipeline: {' '.join(cmd)}")
|
|
|
|
process = subprocess.Popen(
|
|
cmd,
|
|
cwd=os.path.dirname(os.path.abspath(__file__)),
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
)
|
|
|
|
pipeline_process_state["pid"] = process.pid
|
|
|
|
# Wait for completion (non-blocking via asyncio)
|
|
import asyncio
|
|
while process.poll() is None:
|
|
await asyncio.sleep(5)
|
|
|
|
return_code = process.returncode
|
|
|
|
if return_code != 0:
|
|
output = process.stdout.read() if process.stdout else ""
|
|
logger.error(f"Pipeline failed with code {return_code}: {output}")
|
|
else:
|
|
logger.info("Pipeline completed successfully")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to run pipeline: {e}")
|
|
|
|
finally:
|
|
pipeline_process_state["running"] = False
|
|
pipeline_process_state["pid"] = None
|
|
|
|
|
|
@pipeline_router.get("/status")
|
|
async def get_pipeline_status():
|
|
"""
|
|
Get current pipeline running status.
|
|
"""
|
|
from pipeline_checkpoints import CheckpointManager
|
|
|
|
state = CheckpointManager.load_state()
|
|
checkpoint_status = state.get("status") if state else "no_data"
|
|
|
|
return {
|
|
"process_running": pipeline_process_state["running"],
|
|
"process_pid": pipeline_process_state["pid"],
|
|
"process_started_at": pipeline_process_state["started_at"],
|
|
"checkpoint_status": checkpoint_status,
|
|
"current_phase": state.get("current_phase") if state else None,
|
|
}
|
|
|
|
|
|
# ========== Traceability / Quality Endpoints ==========
|
|
|
|
@router.get("/traceability")
|
|
async def get_traceability(
|
|
chunk_id: str = Query(..., description="Chunk ID or identifier"),
|
|
regulation: str = Query(..., description="Regulation code"),
|
|
):
|
|
"""
|
|
Get traceability information for a specific chunk.
|
|
|
|
Returns:
|
|
- The chunk details
|
|
- Requirements extracted from this chunk
|
|
- Controls derived from those requirements
|
|
|
|
Note: This is a placeholder that will be enhanced once the
|
|
requirements extraction pipeline is fully implemented.
|
|
"""
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
try:
|
|
# Try to find the chunk by scrolling through points with the regulation filter
|
|
# In a production system, we would have proper IDs and indexing
|
|
|
|
# For now, return placeholder structure
|
|
# The actual implementation will query:
|
|
# 1. The chunk from Qdrant
|
|
# 2. Requirements from a requirements collection/table
|
|
# 3. Controls from a controls collection/table
|
|
|
|
return {
|
|
"chunk_id": chunk_id,
|
|
"regulation": regulation,
|
|
"requirements": [],
|
|
"controls": [],
|
|
"message": "Traceability-Daten werden verfuegbar sein, sobald die Requirements-Extraktion und Control-Ableitung implementiert sind."
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get traceability: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Traceability lookup failed: {str(e)}")
|