A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.
This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).
Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
724 lines
28 KiB
Python
724 lines
28 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Full Compliance Pipeline for Legal Corpus.
|
||
|
||
This script runs the complete pipeline:
|
||
1. Re-ingest all legal documents with improved chunking
|
||
2. Extract requirements/checkpoints from chunks
|
||
3. Generate controls using AI
|
||
4. Define remediation measures
|
||
5. Update statistics
|
||
|
||
Run on Mac Mini:
|
||
nohup python full_compliance_pipeline.py > /tmp/compliance_pipeline.log 2>&1 &
|
||
|
||
Checkpoints are saved to /tmp/pipeline_checkpoints.json and can be viewed in admin-v2.
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import os
|
||
import sys
|
||
import time
|
||
from datetime import datetime
|
||
from typing import Dict, List, Any, Optional
|
||
from dataclasses import dataclass, asdict
|
||
import re
|
||
import hashlib
|
||
|
||
# Configure logging
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.StreamHandler(sys.stdout),
|
||
logging.FileHandler('/tmp/compliance_pipeline.log')
|
||
]
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Import checkpoint manager
|
||
try:
|
||
from pipeline_checkpoints import CheckpointManager, EXPECTED_VALUES, ValidationStatus
|
||
except ImportError:
|
||
logger.warning("Checkpoint manager not available, running without checkpoints")
|
||
CheckpointManager = None
|
||
EXPECTED_VALUES = {}
|
||
ValidationStatus = None
|
||
|
||
# Set environment variables for Docker network
|
||
# Support both QDRANT_URL and QDRANT_HOST
|
||
if not os.getenv("QDRANT_URL") and not os.getenv("QDRANT_HOST"):
|
||
os.environ["QDRANT_HOST"] = "qdrant"
|
||
os.environ.setdefault("EMBEDDING_SERVICE_URL", "http://embedding-service:8087")
|
||
|
||
# Try to import from klausur-service
|
||
try:
|
||
from legal_corpus_ingestion import LegalCorpusIngestion, REGULATIONS, LEGAL_CORPUS_COLLECTION
|
||
from qdrant_client import QdrantClient
|
||
from qdrant_client.models import Filter, FieldCondition, MatchValue
|
||
except ImportError:
|
||
logger.error("Could not import required modules. Make sure you're in the klausur-service container.")
|
||
sys.exit(1)
|
||
|
||
|
||
@dataclass
|
||
class Checkpoint:
|
||
"""A requirement/checkpoint extracted from legal text."""
|
||
id: str
|
||
regulation_code: str
|
||
regulation_name: str
|
||
article: Optional[str]
|
||
title: str
|
||
description: str
|
||
original_text: str
|
||
chunk_id: str
|
||
source_url: str
|
||
|
||
|
||
@dataclass
|
||
class Control:
|
||
"""A control derived from checkpoints."""
|
||
id: str
|
||
domain: str
|
||
title: str
|
||
description: str
|
||
checkpoints: List[str] # List of checkpoint IDs
|
||
pass_criteria: str
|
||
implementation_guidance: str
|
||
is_automated: bool
|
||
automation_tool: Optional[str]
|
||
priority: str
|
||
|
||
|
||
@dataclass
|
||
class Measure:
|
||
"""A remediation measure for a control."""
|
||
id: str
|
||
control_id: str
|
||
title: str
|
||
description: str
|
||
responsible: str
|
||
deadline_days: int
|
||
status: str
|
||
|
||
|
||
class CompliancePipeline:
|
||
"""Handles the full compliance pipeline."""
|
||
|
||
def __init__(self):
|
||
# Support both QDRANT_URL and QDRANT_HOST/PORT
|
||
qdrant_url = os.getenv("QDRANT_URL", "")
|
||
if qdrant_url:
|
||
from urllib.parse import urlparse
|
||
parsed = urlparse(qdrant_url)
|
||
qdrant_host = parsed.hostname or "qdrant"
|
||
qdrant_port = parsed.port or 6333
|
||
else:
|
||
qdrant_host = os.getenv("QDRANT_HOST", "qdrant")
|
||
qdrant_port = 6333
|
||
self.qdrant = QdrantClient(host=qdrant_host, port=qdrant_port)
|
||
self.checkpoints: List[Checkpoint] = []
|
||
self.controls: List[Control] = []
|
||
self.measures: List[Measure] = []
|
||
self.stats = {
|
||
"chunks_processed": 0,
|
||
"checkpoints_extracted": 0,
|
||
"controls_created": 0,
|
||
"measures_defined": 0,
|
||
"by_regulation": {},
|
||
"by_domain": {},
|
||
}
|
||
# Initialize checkpoint manager
|
||
self.checkpoint_mgr = CheckpointManager() if CheckpointManager else None
|
||
|
||
def extract_checkpoints_from_chunk(self, chunk_text: str, payload: Dict) -> List[Checkpoint]:
|
||
"""
|
||
Extract checkpoints/requirements from a chunk of text.
|
||
|
||
Uses pattern matching to find requirement-like statements.
|
||
"""
|
||
checkpoints = []
|
||
regulation_code = payload.get("regulation_code", "UNKNOWN")
|
||
regulation_name = payload.get("regulation_name", "Unknown")
|
||
source_url = payload.get("source_url", "")
|
||
chunk_id = hashlib.md5(chunk_text[:100].encode()).hexdigest()[:8]
|
||
|
||
# Patterns for different requirement types
|
||
patterns = [
|
||
# BSI-TR patterns
|
||
(r'([OT]\.[A-Za-z_]+\d*)[:\s]+(.+?)(?=\n[OT]\.|$)', 'bsi_requirement'),
|
||
# Article patterns (GDPR, AI Act, etc.)
|
||
(r'(?:Artikel|Art\.?)\s+(\d+)(?:\s+Abs(?:atz)?\.?\s*(\d+))?\s*[-–:]\s*(.+?)(?=\n|$)', 'article'),
|
||
# Numbered requirements
|
||
(r'\((\d+)\)\s+(.+?)(?=\n\(\d+\)|$)', 'numbered'),
|
||
# "Der Verantwortliche muss" patterns
|
||
(r'(?:Der Verantwortliche|Die Aufsichtsbehörde|Der Auftragsverarbeiter)\s+(muss|hat|soll)\s+(.+?)(?=\.\s|$)', 'obligation'),
|
||
# "Es ist erforderlich" patterns
|
||
(r'(?:Es ist erforderlich|Es muss gewährleistet|Es sind geeignete)\s+(.+?)(?=\.\s|$)', 'requirement'),
|
||
]
|
||
|
||
for pattern, pattern_type in patterns:
|
||
matches = re.finditer(pattern, chunk_text, re.MULTILINE | re.DOTALL)
|
||
for match in matches:
|
||
if pattern_type == 'bsi_requirement':
|
||
req_id = match.group(1)
|
||
description = match.group(2).strip()
|
||
title = req_id
|
||
elif pattern_type == 'article':
|
||
article_num = match.group(1)
|
||
paragraph = match.group(2) or ""
|
||
title_text = match.group(3).strip()
|
||
req_id = f"{regulation_code}-Art{article_num}"
|
||
if paragraph:
|
||
req_id += f"-{paragraph}"
|
||
title = f"Art. {article_num}" + (f" Abs. {paragraph}" if paragraph else "")
|
||
description = title_text
|
||
elif pattern_type == 'numbered':
|
||
num = match.group(1)
|
||
description = match.group(2).strip()
|
||
req_id = f"{regulation_code}-{num}"
|
||
title = f"Anforderung {num}"
|
||
else:
|
||
# Generic requirement
|
||
description = match.group(0).strip()
|
||
req_id = f"{regulation_code}-{chunk_id}-{len(checkpoints)}"
|
||
title = description[:50] + "..." if len(description) > 50 else description
|
||
|
||
# Skip very short matches
|
||
if len(description) < 20:
|
||
continue
|
||
|
||
checkpoint = Checkpoint(
|
||
id=req_id,
|
||
regulation_code=regulation_code,
|
||
regulation_name=regulation_name,
|
||
article=title if 'Art' in title else None,
|
||
title=title,
|
||
description=description[:500],
|
||
original_text=description,
|
||
chunk_id=chunk_id,
|
||
source_url=source_url
|
||
)
|
||
checkpoints.append(checkpoint)
|
||
|
||
return checkpoints
|
||
|
||
def generate_control_for_checkpoints(self, checkpoints: List[Checkpoint]) -> Optional[Control]:
|
||
"""
|
||
Generate a control that covers the given checkpoints.
|
||
|
||
This is a simplified version - in production this would use the AI assistant.
|
||
"""
|
||
if not checkpoints:
|
||
return None
|
||
|
||
# Group by regulation
|
||
regulation = checkpoints[0].regulation_code
|
||
|
||
# Determine domain based on content
|
||
all_text = " ".join([cp.description for cp in checkpoints]).lower()
|
||
|
||
domain = "gov" # Default
|
||
if any(kw in all_text for kw in ["verschlüssel", "krypto", "encrypt", "hash"]):
|
||
domain = "crypto"
|
||
elif any(kw in all_text for kw in ["zugang", "access", "authentif", "login", "benutzer"]):
|
||
domain = "iam"
|
||
elif any(kw in all_text for kw in ["datenschutz", "personenbezogen", "privacy", "einwilligung"]):
|
||
domain = "priv"
|
||
elif any(kw in all_text for kw in ["entwicklung", "test", "code", "software"]):
|
||
domain = "sdlc"
|
||
elif any(kw in all_text for kw in ["überwach", "monitor", "log", "audit"]):
|
||
domain = "aud"
|
||
elif any(kw in all_text for kw in ["ki", "künstlich", "ai", "machine learning", "model"]):
|
||
domain = "ai"
|
||
elif any(kw in all_text for kw in ["betrieb", "operation", "verfügbar", "backup"]):
|
||
domain = "ops"
|
||
elif any(kw in all_text for kw in ["cyber", "resilience", "sbom", "vulnerab"]):
|
||
domain = "cra"
|
||
|
||
# Generate control ID
|
||
domain_counts = self.stats.get("by_domain", {})
|
||
domain_count = domain_counts.get(domain, 0) + 1
|
||
control_id = f"{domain.upper()}-{domain_count:03d}"
|
||
|
||
# Create title from first checkpoint
|
||
title = checkpoints[0].title
|
||
if len(title) > 100:
|
||
title = title[:97] + "..."
|
||
|
||
# Create description
|
||
description = f"Control für {regulation}: " + checkpoints[0].description[:200]
|
||
|
||
# Pass criteria
|
||
pass_criteria = f"Alle {len(checkpoints)} zugehörigen Anforderungen sind erfüllt und dokumentiert."
|
||
|
||
# Implementation guidance
|
||
guidance = f"Implementiere Maßnahmen zur Erfüllung der Anforderungen aus {regulation}. "
|
||
guidance += f"Dokumentiere die Umsetzung und führe regelmäßige Reviews durch."
|
||
|
||
# Determine if automated
|
||
is_automated = any(kw in all_text for kw in ["automat", "tool", "scan", "test"])
|
||
|
||
control = Control(
|
||
id=control_id,
|
||
domain=domain,
|
||
title=title,
|
||
description=description,
|
||
checkpoints=[cp.id for cp in checkpoints],
|
||
pass_criteria=pass_criteria,
|
||
implementation_guidance=guidance,
|
||
is_automated=is_automated,
|
||
automation_tool="CI/CD Pipeline" if is_automated else None,
|
||
priority="high" if "muss" in all_text or "erforderlich" in all_text else "medium"
|
||
)
|
||
|
||
return control
|
||
|
||
def generate_measure_for_control(self, control: Control) -> Measure:
|
||
"""Generate a remediation measure for a control."""
|
||
measure_id = f"M-{control.id}"
|
||
|
||
# Determine deadline based on priority
|
||
deadline_days = {
|
||
"critical": 30,
|
||
"high": 60,
|
||
"medium": 90,
|
||
"low": 180
|
||
}.get(control.priority, 90)
|
||
|
||
# Determine responsible team
|
||
responsible = {
|
||
"priv": "Datenschutzbeauftragter",
|
||
"iam": "IT-Security Team",
|
||
"sdlc": "Entwicklungsteam",
|
||
"crypto": "IT-Security Team",
|
||
"ops": "Operations Team",
|
||
"aud": "Compliance Team",
|
||
"ai": "AI/ML Team",
|
||
"cra": "IT-Security Team",
|
||
"gov": "Management"
|
||
}.get(control.domain, "Compliance Team")
|
||
|
||
measure = Measure(
|
||
id=measure_id,
|
||
control_id=control.id,
|
||
title=f"Umsetzung: {control.title[:50]}",
|
||
description=f"Implementierung und Dokumentation von {control.id}: {control.description[:100]}",
|
||
responsible=responsible,
|
||
deadline_days=deadline_days,
|
||
status="pending"
|
||
)
|
||
|
||
return measure
|
||
|
||
async def run_ingestion_phase(self, force_reindex: bool = False) -> int:
|
||
"""Phase 1: Ingest documents (incremental - only missing ones)."""
|
||
logger.info("\n" + "=" * 60)
|
||
logger.info("PHASE 1: DOCUMENT INGESTION (INCREMENTAL)")
|
||
logger.info("=" * 60)
|
||
|
||
if self.checkpoint_mgr:
|
||
self.checkpoint_mgr.start_checkpoint("ingestion", "Document Ingestion")
|
||
|
||
ingestion = LegalCorpusIngestion()
|
||
|
||
try:
|
||
# Check existing chunks per regulation
|
||
existing_chunks = {}
|
||
try:
|
||
for regulation in REGULATIONS:
|
||
count_result = self.qdrant.count(
|
||
collection_name=LEGAL_CORPUS_COLLECTION,
|
||
count_filter=Filter(
|
||
must=[FieldCondition(key="regulation_code", match=MatchValue(value=regulation.code))]
|
||
)
|
||
)
|
||
existing_chunks[regulation.code] = count_result.count
|
||
logger.info(f" {regulation.code}: {count_result.count} existing chunks")
|
||
except Exception as e:
|
||
logger.warning(f"Could not check existing chunks: {e}")
|
||
# Collection might not exist, that's OK
|
||
|
||
# Determine which regulations need ingestion
|
||
regulations_to_ingest = []
|
||
for regulation in REGULATIONS:
|
||
existing = existing_chunks.get(regulation.code, 0)
|
||
if force_reindex or existing == 0:
|
||
regulations_to_ingest.append(regulation)
|
||
logger.info(f" -> Will ingest: {regulation.code} (existing: {existing}, force: {force_reindex})")
|
||
else:
|
||
logger.info(f" -> Skipping: {regulation.code} (already has {existing} chunks)")
|
||
self.stats["by_regulation"][regulation.code] = existing
|
||
|
||
if not regulations_to_ingest:
|
||
logger.info("All regulations already indexed. Skipping ingestion phase.")
|
||
total_chunks = sum(existing_chunks.values())
|
||
self.stats["chunks_processed"] = total_chunks
|
||
if self.checkpoint_mgr:
|
||
self.checkpoint_mgr.add_metric("total_chunks", total_chunks)
|
||
self.checkpoint_mgr.add_metric("skipped", True)
|
||
self.checkpoint_mgr.complete_checkpoint(success=True)
|
||
return total_chunks
|
||
|
||
# Ingest only missing regulations
|
||
total_chunks = sum(existing_chunks.values())
|
||
for i, regulation in enumerate(regulations_to_ingest, 1):
|
||
logger.info(f"[{i}/{len(regulations_to_ingest)}] Ingesting {regulation.code}...")
|
||
try:
|
||
count = await ingestion.ingest_regulation(regulation)
|
||
total_chunks += count
|
||
self.stats["by_regulation"][regulation.code] = count
|
||
logger.info(f" -> {count} chunks")
|
||
|
||
# Add metric for this regulation
|
||
if self.checkpoint_mgr:
|
||
self.checkpoint_mgr.add_metric(f"chunks_{regulation.code}", count)
|
||
|
||
except Exception as e:
|
||
logger.error(f" -> FAILED: {e}")
|
||
self.stats["by_regulation"][regulation.code] = 0
|
||
|
||
self.stats["chunks_processed"] = total_chunks
|
||
logger.info(f"\nTotal chunks in collection: {total_chunks}")
|
||
|
||
# Validate ingestion results
|
||
if self.checkpoint_mgr:
|
||
self.checkpoint_mgr.add_metric("total_chunks", total_chunks)
|
||
self.checkpoint_mgr.add_metric("regulations_count", len(REGULATIONS))
|
||
|
||
# Validate total chunks
|
||
expected = EXPECTED_VALUES.get("ingestion", {})
|
||
self.checkpoint_mgr.validate(
|
||
"total_chunks",
|
||
expected=expected.get("total_chunks", 8000),
|
||
actual=total_chunks,
|
||
min_value=expected.get("min_chunks", 7000)
|
||
)
|
||
|
||
# Validate key regulations
|
||
reg_expected = expected.get("regulations", {})
|
||
for reg_code, reg_exp in reg_expected.items():
|
||
actual = self.stats["by_regulation"].get(reg_code, 0)
|
||
self.checkpoint_mgr.validate(
|
||
f"chunks_{reg_code}",
|
||
expected=reg_exp.get("expected", 0),
|
||
actual=actual,
|
||
min_value=reg_exp.get("min", 0)
|
||
)
|
||
|
||
self.checkpoint_mgr.complete_checkpoint(success=True)
|
||
|
||
return total_chunks
|
||
|
||
except Exception as e:
|
||
if self.checkpoint_mgr:
|
||
self.checkpoint_mgr.fail_checkpoint(str(e))
|
||
raise
|
||
|
||
finally:
|
||
await ingestion.close()
|
||
|
||
async def run_extraction_phase(self) -> int:
|
||
"""Phase 2: Extract checkpoints from chunks."""
|
||
logger.info("\n" + "=" * 60)
|
||
logger.info("PHASE 2: CHECKPOINT EXTRACTION")
|
||
logger.info("=" * 60)
|
||
|
||
if self.checkpoint_mgr:
|
||
self.checkpoint_mgr.start_checkpoint("extraction", "Checkpoint Extraction")
|
||
|
||
try:
|
||
# Scroll through all chunks
|
||
offset = None
|
||
total_checkpoints = 0
|
||
|
||
while True:
|
||
result = self.qdrant.scroll(
|
||
collection_name=LEGAL_CORPUS_COLLECTION,
|
||
limit=100,
|
||
offset=offset,
|
||
with_payload=True,
|
||
with_vectors=False
|
||
)
|
||
|
||
points, next_offset = result
|
||
|
||
if not points:
|
||
break
|
||
|
||
for point in points:
|
||
payload = point.payload
|
||
text = payload.get("text", "")
|
||
|
||
checkpoints = self.extract_checkpoints_from_chunk(text, payload)
|
||
self.checkpoints.extend(checkpoints)
|
||
total_checkpoints += len(checkpoints)
|
||
|
||
logger.info(f"Processed {len(points)} chunks, extracted {total_checkpoints} checkpoints so far...")
|
||
|
||
if next_offset is None:
|
||
break
|
||
offset = next_offset
|
||
|
||
self.stats["checkpoints_extracted"] = len(self.checkpoints)
|
||
logger.info(f"\nTotal checkpoints extracted: {len(self.checkpoints)}")
|
||
|
||
# Log per regulation
|
||
by_reg = {}
|
||
for cp in self.checkpoints:
|
||
by_reg[cp.regulation_code] = by_reg.get(cp.regulation_code, 0) + 1
|
||
for reg, count in sorted(by_reg.items()):
|
||
logger.info(f" {reg}: {count} checkpoints")
|
||
|
||
# Validate extraction results
|
||
if self.checkpoint_mgr:
|
||
self.checkpoint_mgr.add_metric("total_checkpoints", len(self.checkpoints))
|
||
self.checkpoint_mgr.add_metric("checkpoints_by_regulation", by_reg)
|
||
|
||
expected = EXPECTED_VALUES.get("extraction", {})
|
||
self.checkpoint_mgr.validate(
|
||
"total_checkpoints",
|
||
expected=expected.get("total_checkpoints", 3500),
|
||
actual=len(self.checkpoints),
|
||
min_value=expected.get("min_checkpoints", 3000)
|
||
)
|
||
|
||
self.checkpoint_mgr.complete_checkpoint(success=True)
|
||
|
||
return len(self.checkpoints)
|
||
|
||
except Exception as e:
|
||
if self.checkpoint_mgr:
|
||
self.checkpoint_mgr.fail_checkpoint(str(e))
|
||
raise
|
||
|
||
async def run_control_generation_phase(self) -> int:
|
||
"""Phase 3: Generate controls from checkpoints."""
|
||
logger.info("\n" + "=" * 60)
|
||
logger.info("PHASE 3: CONTROL GENERATION")
|
||
logger.info("=" * 60)
|
||
|
||
if self.checkpoint_mgr:
|
||
self.checkpoint_mgr.start_checkpoint("controls", "Control Generation")
|
||
|
||
try:
|
||
# Group checkpoints by regulation
|
||
by_regulation: Dict[str, List[Checkpoint]] = {}
|
||
for cp in self.checkpoints:
|
||
reg = cp.regulation_code
|
||
if reg not in by_regulation:
|
||
by_regulation[reg] = []
|
||
by_regulation[reg].append(cp)
|
||
|
||
# Generate controls per regulation (group every 3-5 checkpoints)
|
||
for regulation, checkpoints in by_regulation.items():
|
||
logger.info(f"Generating controls for {regulation} ({len(checkpoints)} checkpoints)...")
|
||
|
||
# Group checkpoints into batches of 3-5
|
||
batch_size = 4
|
||
for i in range(0, len(checkpoints), batch_size):
|
||
batch = checkpoints[i:i + batch_size]
|
||
control = self.generate_control_for_checkpoints(batch)
|
||
|
||
if control:
|
||
self.controls.append(control)
|
||
self.stats["by_domain"][control.domain] = self.stats["by_domain"].get(control.domain, 0) + 1
|
||
|
||
self.stats["controls_created"] = len(self.controls)
|
||
logger.info(f"\nTotal controls created: {len(self.controls)}")
|
||
|
||
# Log per domain
|
||
for domain, count in sorted(self.stats["by_domain"].items()):
|
||
logger.info(f" {domain}: {count} controls")
|
||
|
||
# Validate control generation
|
||
if self.checkpoint_mgr:
|
||
self.checkpoint_mgr.add_metric("total_controls", len(self.controls))
|
||
self.checkpoint_mgr.add_metric("controls_by_domain", dict(self.stats["by_domain"]))
|
||
|
||
expected = EXPECTED_VALUES.get("controls", {})
|
||
self.checkpoint_mgr.validate(
|
||
"total_controls",
|
||
expected=expected.get("total_controls", 900),
|
||
actual=len(self.controls),
|
||
min_value=expected.get("min_controls", 800)
|
||
)
|
||
|
||
self.checkpoint_mgr.complete_checkpoint(success=True)
|
||
|
||
return len(self.controls)
|
||
|
||
except Exception as e:
|
||
if self.checkpoint_mgr:
|
||
self.checkpoint_mgr.fail_checkpoint(str(e))
|
||
raise
|
||
|
||
async def run_measure_generation_phase(self) -> int:
|
||
"""Phase 4: Generate measures for controls."""
|
||
logger.info("\n" + "=" * 60)
|
||
logger.info("PHASE 4: MEASURE GENERATION")
|
||
logger.info("=" * 60)
|
||
|
||
if self.checkpoint_mgr:
|
||
self.checkpoint_mgr.start_checkpoint("measures", "Measure Generation")
|
||
|
||
try:
|
||
for control in self.controls:
|
||
measure = self.generate_measure_for_control(control)
|
||
self.measures.append(measure)
|
||
|
||
self.stats["measures_defined"] = len(self.measures)
|
||
logger.info(f"\nTotal measures defined: {len(self.measures)}")
|
||
|
||
# Validate measure generation
|
||
if self.checkpoint_mgr:
|
||
self.checkpoint_mgr.add_metric("total_measures", len(self.measures))
|
||
|
||
expected = EXPECTED_VALUES.get("measures", {})
|
||
self.checkpoint_mgr.validate(
|
||
"total_measures",
|
||
expected=expected.get("total_measures", 900),
|
||
actual=len(self.measures),
|
||
min_value=expected.get("min_measures", 800)
|
||
)
|
||
|
||
self.checkpoint_mgr.complete_checkpoint(success=True)
|
||
|
||
return len(self.measures)
|
||
|
||
except Exception as e:
|
||
if self.checkpoint_mgr:
|
||
self.checkpoint_mgr.fail_checkpoint(str(e))
|
||
raise
|
||
|
||
def save_results(self, output_dir: str = "/tmp/compliance_output"):
|
||
"""Save results to JSON files."""
|
||
logger.info("\n" + "=" * 60)
|
||
logger.info("SAVING RESULTS")
|
||
logger.info("=" * 60)
|
||
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
# Save checkpoints
|
||
checkpoints_file = os.path.join(output_dir, "checkpoints.json")
|
||
with open(checkpoints_file, "w") as f:
|
||
json.dump([asdict(cp) for cp in self.checkpoints], f, indent=2, ensure_ascii=False)
|
||
logger.info(f"Saved {len(self.checkpoints)} checkpoints to {checkpoints_file}")
|
||
|
||
# Save controls
|
||
controls_file = os.path.join(output_dir, "controls.json")
|
||
with open(controls_file, "w") as f:
|
||
json.dump([asdict(c) for c in self.controls], f, indent=2, ensure_ascii=False)
|
||
logger.info(f"Saved {len(self.controls)} controls to {controls_file}")
|
||
|
||
# Save measures
|
||
measures_file = os.path.join(output_dir, "measures.json")
|
||
with open(measures_file, "w") as f:
|
||
json.dump([asdict(m) for m in self.measures], f, indent=2, ensure_ascii=False)
|
||
logger.info(f"Saved {len(self.measures)} measures to {measures_file}")
|
||
|
||
# Save statistics
|
||
stats_file = os.path.join(output_dir, "statistics.json")
|
||
self.stats["generated_at"] = datetime.now().isoformat()
|
||
with open(stats_file, "w") as f:
|
||
json.dump(self.stats, f, indent=2, ensure_ascii=False)
|
||
logger.info(f"Saved statistics to {stats_file}")
|
||
|
||
async def run_full_pipeline(self, force_reindex: bool = False, skip_ingestion: bool = False):
|
||
"""Run the complete pipeline.
|
||
|
||
Args:
|
||
force_reindex: If True, re-ingest all documents even if they exist
|
||
skip_ingestion: If True, skip ingestion phase entirely (use existing chunks)
|
||
"""
|
||
start_time = time.time()
|
||
|
||
logger.info("=" * 60)
|
||
logger.info("FULL COMPLIANCE PIPELINE (INCREMENTAL)")
|
||
logger.info(f"Started at: {datetime.now().isoformat()}")
|
||
logger.info(f"Force reindex: {force_reindex}")
|
||
logger.info(f"Skip ingestion: {skip_ingestion}")
|
||
if self.checkpoint_mgr:
|
||
logger.info(f"Pipeline ID: {self.checkpoint_mgr.pipeline_id}")
|
||
logger.info("=" * 60)
|
||
|
||
try:
|
||
# Phase 1: Ingestion (skip if requested or run incrementally)
|
||
if skip_ingestion:
|
||
logger.info("Skipping ingestion phase as requested...")
|
||
# Still get the chunk count
|
||
try:
|
||
collection_info = self.qdrant.get_collection(LEGAL_CORPUS_COLLECTION)
|
||
self.stats["chunks_processed"] = collection_info.points_count
|
||
except Exception:
|
||
self.stats["chunks_processed"] = 0
|
||
else:
|
||
await self.run_ingestion_phase(force_reindex=force_reindex)
|
||
|
||
# Phase 2: Extraction
|
||
await self.run_extraction_phase()
|
||
|
||
# Phase 3: Control Generation
|
||
await self.run_control_generation_phase()
|
||
|
||
# Phase 4: Measure Generation
|
||
await self.run_measure_generation_phase()
|
||
|
||
# Save results
|
||
self.save_results()
|
||
|
||
# Final summary
|
||
elapsed = time.time() - start_time
|
||
logger.info("\n" + "=" * 60)
|
||
logger.info("PIPELINE COMPLETE")
|
||
logger.info("=" * 60)
|
||
logger.info(f"Duration: {elapsed:.1f} seconds")
|
||
logger.info(f"Chunks processed: {self.stats['chunks_processed']}")
|
||
logger.info(f"Checkpoints extracted: {self.stats['checkpoints_extracted']}")
|
||
logger.info(f"Controls created: {self.stats['controls_created']}")
|
||
logger.info(f"Measures defined: {self.stats['measures_defined']}")
|
||
logger.info(f"\nResults saved to: /tmp/compliance_output/")
|
||
logger.info("Checkpoint status: /tmp/pipeline_checkpoints.json")
|
||
logger.info("=" * 60)
|
||
|
||
# Complete pipeline checkpoint
|
||
if self.checkpoint_mgr:
|
||
self.checkpoint_mgr.complete_pipeline({
|
||
"duration_seconds": elapsed,
|
||
"chunks_processed": self.stats['chunks_processed'],
|
||
"checkpoints_extracted": self.stats['checkpoints_extracted'],
|
||
"controls_created": self.stats['controls_created'],
|
||
"measures_defined": self.stats['measures_defined'],
|
||
"by_regulation": self.stats['by_regulation'],
|
||
"by_domain": self.stats['by_domain'],
|
||
})
|
||
|
||
except Exception as e:
|
||
logger.error(f"Pipeline failed: {e}")
|
||
if self.checkpoint_mgr:
|
||
self.checkpoint_mgr.state.status = "failed"
|
||
self.checkpoint_mgr._save()
|
||
raise
|
||
|
||
|
||
async def main():
|
||
import argparse
|
||
parser = argparse.ArgumentParser(description="Run the compliance pipeline")
|
||
parser.add_argument("--force-reindex", action="store_true",
|
||
help="Force re-ingestion of all documents")
|
||
parser.add_argument("--skip-ingestion", action="store_true",
|
||
help="Skip ingestion phase, use existing chunks")
|
||
args = parser.parse_args()
|
||
|
||
pipeline = CompliancePipeline()
|
||
await pipeline.run_full_pipeline(
|
||
force_reindex=args.force_reindex,
|
||
skip_ingestion=args.skip_ingestion
|
||
)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|