From de19ef06843a68a6065c4c349a4ecd52d3c7644b Mon Sep 17 00:00:00 2001
From: Benjamin Admin
Date: Fri, 13 Mar 2026 09:03:37 +0100
Subject: [PATCH] =?UTF-8?q?feat(control-generator):=207-stage=20pipeline?=
=?UTF-8?q?=20for=20RAG=E2=86=92LLM=E2=86=92Controls=20generation?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Implements the Control Generator Pipeline that systematically generates
canonical security controls from 150k+ RAG chunks across all compliance
collections (BSI, NIST, OWASP, ENISA, EU laws, German laws).
Three license rules enforced throughout:
- Rule 1 (free_use): Laws/Public Domain — original text preserved
- Rule 2 (citation_required): CC-BY/CC-BY-SA — text with citation
- Rule 3 (restricted): BSI/ISO — full reformulation, no source traces
New files:
- Migration 046: job tracking, chunk tracking, blocked sources tables
- control_generator.py: 7-stage pipeline (scan→classify→structure/reform→harmonize→anchor→store→mark)
- anchor_finder.py: RAG + DuckDuckGo open-source reference search
- control_generator_routes.py: REST API (generate, review, stats, blocked-sources)
- test_control_generator.py: license mapping, rule enforcement, anchor filtering tests
Modified:
- __init__.py: register control_generator_router
- route.ts: proxy generator/review/stats endpoints
- page.tsx: Generator modal, stats panel, state filter, review queue, license badges
Co-Authored-By: Claude Opus 4.6
---
.../app/api/sdk/v1/canonical/route.ts | 49 +
.../app/sdk/control-library/page.tsx | 344 ++++++-
backend-compliance/compliance/api/__init__.py | 3 +
.../api/control_generator_routes.py | 433 ++++++++
.../compliance/services/anchor_finder.py | 188 ++++
.../compliance/services/control_generator.py | 951 ++++++++++++++++++
.../migrations/046_control_generator.sql | 103 ++
.../tests/test_control_generator.py | 342 +++++++
8 files changed, 2404 insertions(+), 9 deletions(-)
create mode 100644 backend-compliance/compliance/api/control_generator_routes.py
create mode 100644 backend-compliance/compliance/services/anchor_finder.py
create mode 100644 backend-compliance/compliance/services/control_generator.py
create mode 100644 backend-compliance/migrations/046_control_generator.sql
create mode 100644 backend-compliance/tests/test_control_generator.py
diff --git a/admin-compliance/app/api/sdk/v1/canonical/route.ts b/admin-compliance/app/api/sdk/v1/canonical/route.ts
index bf3540e..2a79300 100644
--- a/admin-compliance/app/api/sdk/v1/canonical/route.ts
+++ b/admin-compliance/app/api/sdk/v1/canonical/route.ts
@@ -52,6 +52,45 @@ export async function GET(request: NextRequest) {
backendPath = '/api/compliance/v1/canonical/licenses'
break
+ // Generator endpoints
+ case 'generate-jobs':
+ backendPath = '/api/compliance/v1/canonical/generate/jobs'
+ break
+
+ case 'generate-status': {
+ const jobId = searchParams.get('jobId')
+ if (!jobId) {
+ return NextResponse.json({ error: 'Missing jobId' }, { status: 400 })
+ }
+ backendPath = `/api/compliance/v1/canonical/generate/status/${encodeURIComponent(jobId)}`
+ break
+ }
+
+ case 'review-queue': {
+ const state = searchParams.get('release_state') || 'needs_review'
+ backendPath = `/api/compliance/v1/canonical/generate/review-queue?release_state=${encodeURIComponent(state)}`
+ break
+ }
+
+ case 'processed-stats':
+ backendPath = '/api/compliance/v1/canonical/generate/processed-stats'
+ break
+
+ case 'blocked-sources':
+ backendPath = '/api/compliance/v1/canonical/blocked-sources'
+ break
+
+ case 'controls-customer': {
+ const custSeverity = searchParams.get('severity')
+ const custDomain = searchParams.get('domain')
+ const custParams = new URLSearchParams()
+ if (custSeverity) custParams.set('severity', custSeverity)
+ if (custDomain) custParams.set('domain', custDomain)
+ const custQs = custParams.toString()
+ backendPath = `/api/compliance/v1/canonical/controls-customer${custQs ? `?${custQs}` : ''}`
+ break
+ }
+
default:
return NextResponse.json({ error: `Unknown endpoint: ${endpoint}` }, { status: 400 })
}
@@ -95,6 +134,16 @@ export async function POST(request: NextRequest) {
if (endpoint === 'create-control') {
backendPath = '/api/compliance/v1/canonical/controls'
+ } else if (endpoint === 'generate') {
+ backendPath = '/api/compliance/v1/canonical/generate'
+ } else if (endpoint === 'review') {
+ const controlId = searchParams.get('id')
+ if (!controlId) {
+ return NextResponse.json({ error: 'Missing control id' }, { status: 400 })
+ }
+ backendPath = `/api/compliance/v1/canonical/generate/review/${encodeURIComponent(controlId)}`
+ } else if (endpoint === 'blocked-sources-cleanup') {
+ backendPath = '/api/compliance/v1/canonical/blocked-sources/cleanup'
} else if (endpoint === 'similarity-check') {
const controlId = searchParams.get('id')
if (!controlId) {
diff --git a/admin-compliance/app/sdk/control-library/page.tsx b/admin-compliance/app/sdk/control-library/page.tsx
index e65df88..bd0617c 100644
--- a/admin-compliance/app/sdk/control-library/page.tsx
+++ b/admin-compliance/app/sdk/control-library/page.tsx
@@ -5,6 +5,7 @@ import {
Shield, Search, ChevronRight, ArrowLeft, ExternalLink,
Filter, AlertTriangle, CheckCircle2, Info, Lock,
FileText, BookOpen, Scale, Plus, Pencil, Trash2, Save, X,
+ Zap, BarChart3, Eye, RefreshCw, Clock,
} from 'lucide-react'
// =============================================================================
@@ -44,6 +45,11 @@ interface CanonicalControl {
open_anchors: OpenAnchor[]
release_state: string
tags: string[]
+ license_rule?: number | null
+ source_original_text?: string | null
+ source_citation?: Record | null
+ customer_visible?: boolean
+ generation_metadata?: Record | null
created_at: string
updated_at: string
}
@@ -116,14 +122,34 @@ function StateBadge({ state }: { state: string }) {
review: 'bg-blue-100 text-blue-700',
approved: 'bg-green-100 text-green-700',
deprecated: 'bg-red-100 text-red-600',
+ needs_review: 'bg-yellow-100 text-yellow-800',
+ too_close: 'bg-red-100 text-red-700',
+ duplicate: 'bg-orange-100 text-orange-700',
+ }
+ const labels: Record = {
+ needs_review: 'Review noetig',
+ too_close: 'Zu aehnlich',
+ duplicate: 'Duplikat',
}
return (
- {state}
+ {labels[state] || state}
)
}
+function LicenseRuleBadge({ rule }: { rule: number | null | undefined }) {
+ if (!rule) return null
+ const config: Record = {
+ 1: { bg: 'bg-green-100 text-green-700', label: 'Free Use' },
+ 2: { bg: 'bg-blue-100 text-blue-700', label: 'Zitation' },
+ 3: { bg: 'bg-amber-100 text-amber-700', label: 'Reformuliert' },
+ }
+ const c = config[rule]
+ if (!c) return null
+ return {c.label}
+}
+
function getDomain(controlId: string): string {
return controlId.split('-')[0] || ''
}
@@ -419,6 +445,17 @@ export default function ControlLibraryPage() {
const [mode, setMode] = useState<'list' | 'detail' | 'create' | 'edit'>('list')
const [saving, setSaving] = useState(false)
+ // Generator state
+ const [showGenerator, setShowGenerator] = useState(false)
+ const [generating, setGenerating] = useState(false)
+ const [genResult, setGenResult] = useState | null>(null)
+ const [genDomain, setGenDomain] = useState('')
+ const [genMaxControls, setGenMaxControls] = useState(10)
+ const [genDryRun, setGenDryRun] = useState(true)
+ const [stateFilter, setStateFilter] = useState('')
+ const [processedStats, setProcessedStats] = useState>>([])
+ const [showStats, setShowStats] = useState(false)
+
// Load data
const loadData = useCallback(async () => {
try {
@@ -450,6 +487,7 @@ export default function ControlLibraryPage() {
return controls.filter(c => {
if (severityFilter && c.severity !== severityFilter) return false
if (domainFilter && getDomain(c.control_id) !== domainFilter) return false
+ if (stateFilter && c.release_state !== stateFilter) return false
if (searchQuery) {
const q = searchQuery.toLowerCase()
return (
@@ -461,7 +499,7 @@ export default function ControlLibraryPage() {
}
return true
})
- }, [controls, severityFilter, domainFilter, searchQuery])
+ }, [controls, severityFilter, domainFilter, stateFilter, searchQuery])
// CRUD handlers
const handleCreate = async (data: typeof EMPTY_CONTROL) => {
@@ -526,6 +564,63 @@ export default function ControlLibraryPage() {
}
}
+ // Generator handlers
+ const handleGenerate = async () => {
+ setGenerating(true)
+ setGenResult(null)
+ try {
+ const res = await fetch(`${BACKEND_URL}?endpoint=generate`, {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({
+ domain: genDomain || null,
+ max_controls: genMaxControls,
+ dry_run: genDryRun,
+ skip_web_search: false,
+ }),
+ })
+ if (!res.ok) {
+ const err = await res.json()
+ setGenResult({ status: 'error', message: err.error || err.details || 'Fehler' })
+ return
+ }
+ const data = await res.json()
+ setGenResult(data)
+ if (!genDryRun) {
+ await loadData()
+ }
+ } catch {
+ setGenResult({ status: 'error', message: 'Netzwerkfehler' })
+ } finally {
+ setGenerating(false)
+ }
+ }
+
+ const loadProcessedStats = async () => {
+ try {
+ const res = await fetch(`${BACKEND_URL}?endpoint=processed-stats`)
+ if (res.ok) {
+ const data = await res.json()
+ setProcessedStats(data.stats || [])
+ }
+ } catch { /* ignore */ }
+ }
+
+ const handleReview = async (controlId: string, action: string) => {
+ try {
+ const res = await fetch(`${BACKEND_URL}?endpoint=review&id=${controlId}`, {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({ action }),
+ })
+ if (res.ok) {
+ await loadData()
+ setSelectedControl(null)
+ setMode('list')
+ }
+ } catch { /* ignore */ }
+ }
+
if (loading) {
return (
@@ -748,6 +843,98 @@ export default function ControlLibraryPage() {
)}
+
+ {/* License & Citation Info */}
+ {ctrl.license_rule && (
+
+
+
+
Lizenzinformationen
+
+
+ {ctrl.source_citation && (
+
+
Quelle: {ctrl.source_citation.source}
+ {ctrl.source_citation.license &&
Lizenz: {ctrl.source_citation.license}
}
+ {ctrl.source_citation.license_notice &&
Hinweis: {ctrl.source_citation.license_notice}
}
+ {ctrl.source_citation.url && (
+
+ Originalquelle
+
+ )}
+
+ )}
+ {ctrl.source_original_text && (
+
+ Originaltext anzeigen
+ {ctrl.source_original_text}
+
+ )}
+ {ctrl.license_rule === 3 && (
+
+
+ Eigenstaendig formuliert — keine Originalquelle gespeichert
+
+ )}
+
+ )}
+
+ {/* Generation Metadata (internal) */}
+ {ctrl.generation_metadata && (
+
+
+
+
Generierungsdetails (intern)
+
+
+
Pfad: {String(ctrl.generation_metadata.processing_path || '-')}
+ {ctrl.generation_metadata.similarity_status && (
+
Similarity: {String(ctrl.generation_metadata.similarity_status)}
+ )}
+ {Array.isArray(ctrl.generation_metadata.similar_controls) && (
+
+
Aehnliche Controls:
+ {(ctrl.generation_metadata.similar_controls as Array
>).map((s, i) => (
+ {String(s.control_id)} — {String(s.title)} ({String(s.similarity)})
+ ))}
+
+ )}
+
+
+ )}
+
+ {/* Review Actions */}
+ {['needs_review', 'too_close', 'duplicate'].includes(ctrl.release_state) && (
+
+
+
+
Review erforderlich
+
+
+
+
+
+
+
+ )}
)
@@ -772,13 +959,29 @@ export default function ControlLibraryPage() {
-
@@ -847,6 +1172,7 @@ export default function ControlLibraryPage() {
{ctrl.control_id}
+
{ctrl.risk_score !== null && (
Score: {ctrl.risk_score}
)}
diff --git a/backend-compliance/compliance/api/__init__.py b/backend-compliance/compliance/api/__init__.py
index 7f6713a..98fb9b0 100644
--- a/backend-compliance/compliance/api/__init__.py
+++ b/backend-compliance/compliance/api/__init__.py
@@ -34,6 +34,7 @@ from .generation_routes import router as generation_router
from .project_routes import router as project_router
from .wiki_routes import router as wiki_router
from .canonical_control_routes import router as canonical_control_router
+from .control_generator_routes import router as control_generator_router
# Include sub-routers
router.include_router(audit_router)
@@ -69,6 +70,7 @@ router.include_router(generation_router)
router.include_router(project_router)
router.include_router(wiki_router)
router.include_router(canonical_control_router)
+router.include_router(control_generator_router)
__all__ = [
"router",
@@ -104,4 +106,5 @@ __all__ = [
"project_router",
"wiki_router",
"canonical_control_router",
+ "control_generator_router",
]
diff --git a/backend-compliance/compliance/api/control_generator_routes.py b/backend-compliance/compliance/api/control_generator_routes.py
new file mode 100644
index 0000000..2a9c0d1
--- /dev/null
+++ b/backend-compliance/compliance/api/control_generator_routes.py
@@ -0,0 +1,433 @@
+"""
+FastAPI routes for the Control Generator Pipeline.
+
+Endpoints:
+ POST /v1/canonical/generate — Start generation run
+ GET /v1/canonical/generate/status/{job_id} — Job status
+ GET /v1/canonical/generate/jobs — All jobs
+ GET /v1/canonical/generate/review-queue — Controls needing review
+ POST /v1/canonical/generate/review/{control_id} — Complete review
+ GET /v1/canonical/generate/processed-stats — Processing stats per collection
+ GET /v1/canonical/blocked-sources — Blocked sources list
+ POST /v1/canonical/blocked-sources/cleanup — Start cleanup workflow
+"""
+
+from __future__ import annotations
+
+import json
+import logging
+from typing import Optional
+
+from fastapi import APIRouter, HTTPException, Query
+from pydantic import BaseModel
+from sqlalchemy import text
+
+from database import SessionLocal
+from compliance.services.control_generator import (
+ ControlGeneratorPipeline,
+ GeneratorConfig,
+ ALL_COLLECTIONS,
+)
+from compliance.services.rag_client import get_rag_client
+
+logger = logging.getLogger(__name__)
+router = APIRouter(prefix="/v1/canonical", tags=["control-generator"])
+
+
+# =============================================================================
+# REQUEST / RESPONSE MODELS
+# =============================================================================
+
+class GenerateRequest(BaseModel):
+ domain: str | None = None
+ collections: list[str] | None = None
+ max_controls: int = 50
+ batch_size: int = 5
+ skip_web_search: bool = False
+ dry_run: bool = False
+
+
+class GenerateResponse(BaseModel):
+ job_id: str
+ status: str
+ message: str
+ total_chunks_scanned: int = 0
+ controls_generated: int = 0
+ controls_verified: int = 0
+ controls_needs_review: int = 0
+ controls_too_close: int = 0
+ controls_duplicates_found: int = 0
+ errors: list = []
+ controls: list = []
+
+
+class ReviewRequest(BaseModel):
+ action: str # "approve", "reject", "needs_rework"
+ release_state: str | None = None # Override release_state
+ notes: str | None = None
+
+
+class ProcessedStats(BaseModel):
+ collection: str
+ total_chunks_estimated: int
+ processed_chunks: int
+ pending_chunks: int
+ direct_adopted: int
+ llm_reformed: int
+ skipped: int
+
+
+class BlockedSourceResponse(BaseModel):
+ id: str
+ regulation_code: str
+ document_title: str
+ reason: str
+ deletion_status: str
+ qdrant_collection: str | None = None
+ marked_at: str
+
+
+# =============================================================================
+# ENDPOINTS
+# =============================================================================
+
+@router.post("/generate", response_model=GenerateResponse)
+async def start_generation(req: GenerateRequest):
+ """Start a control generation run."""
+ config = GeneratorConfig(
+ collections=req.collections,
+ domain=req.domain,
+ batch_size=req.batch_size,
+ max_controls=req.max_controls,
+ skip_web_search=req.skip_web_search,
+ dry_run=req.dry_run,
+ )
+
+ db = SessionLocal()
+ try:
+ pipeline = ControlGeneratorPipeline(db=db, rag_client=get_rag_client())
+ result = await pipeline.run(config)
+
+ return GenerateResponse(
+ job_id=result.job_id,
+ status=result.status,
+ message=f"Generated {result.controls_generated} controls from {result.total_chunks_scanned} chunks",
+ total_chunks_scanned=result.total_chunks_scanned,
+ controls_generated=result.controls_generated,
+ controls_verified=result.controls_verified,
+ controls_needs_review=result.controls_needs_review,
+ controls_too_close=result.controls_too_close,
+ controls_duplicates_found=result.controls_duplicates_found,
+ errors=result.errors,
+ controls=result.controls if req.dry_run else [],
+ )
+ except Exception as e:
+ logger.error("Generation failed: %s", e)
+ raise HTTPException(status_code=500, detail=str(e))
+ finally:
+ db.close()
+
+
+@router.get("/generate/status/{job_id}")
+async def get_job_status(job_id: str):
+ """Get status of a generation job."""
+ db = SessionLocal()
+ try:
+ result = db.execute(
+ text("SELECT * FROM canonical_generation_jobs WHERE id = :id::uuid"),
+ {"id": job_id},
+ )
+ row = result.fetchone()
+ if not row:
+ raise HTTPException(status_code=404, detail="Job not found")
+
+ cols = result.keys()
+ job = dict(zip(cols, row))
+ # Serialize datetime fields
+ for key in ("started_at", "completed_at", "created_at"):
+ if job.get(key):
+ job[key] = str(job[key])
+ job["id"] = str(job["id"])
+ return job
+ finally:
+ db.close()
+
+
+@router.get("/generate/jobs")
+async def list_jobs(
+ limit: int = Query(20, ge=1, le=100),
+ offset: int = Query(0, ge=0),
+):
+ """List all generation jobs."""
+ db = SessionLocal()
+ try:
+ result = db.execute(
+ text("""
+ SELECT id, status, total_chunks_scanned, controls_generated,
+ controls_verified, controls_needs_review, controls_too_close,
+ controls_duplicates_found, created_at, completed_at
+ FROM canonical_generation_jobs
+ ORDER BY created_at DESC
+ LIMIT :limit OFFSET :offset
+ """),
+ {"limit": limit, "offset": offset},
+ )
+ jobs = []
+ cols = result.keys()
+ for row in result:
+ job = dict(zip(cols, row))
+ job["id"] = str(job["id"])
+ for key in ("created_at", "completed_at"):
+ if job.get(key):
+ job[key] = str(job[key])
+ jobs.append(job)
+ return {"jobs": jobs, "total": len(jobs)}
+ finally:
+ db.close()
+
+
+@router.get("/generate/review-queue")
+async def get_review_queue(
+ release_state: str = Query("needs_review", regex="^(needs_review|too_close|duplicate)$"),
+ limit: int = Query(50, ge=1, le=200),
+):
+ """Get controls that need manual review."""
+ db = SessionLocal()
+ try:
+ result = db.execute(
+ text("""
+ SELECT c.id, c.control_id, c.title, c.objective, c.severity,
+ c.release_state, c.license_rule, c.customer_visible,
+ c.generation_metadata, c.open_anchors, c.tags,
+ c.created_at
+ FROM canonical_controls c
+ WHERE c.release_state = :state
+ ORDER BY c.created_at DESC
+ LIMIT :limit
+ """),
+ {"state": release_state, "limit": limit},
+ )
+ controls = []
+ cols = result.keys()
+ for row in result:
+ ctrl = dict(zip(cols, row))
+ ctrl["id"] = str(ctrl["id"])
+ ctrl["created_at"] = str(ctrl["created_at"])
+ # Parse JSON fields
+ for jf in ("generation_metadata", "open_anchors", "tags"):
+ if isinstance(ctrl.get(jf), str):
+ try:
+ ctrl[jf] = json.loads(ctrl[jf])
+ except (json.JSONDecodeError, TypeError):
+ pass
+ controls.append(ctrl)
+ return {"controls": controls, "total": len(controls)}
+ finally:
+ db.close()
+
+
+@router.post("/generate/review/{control_id}")
+async def review_control(control_id: str, req: ReviewRequest):
+ """Complete review of a generated control."""
+ db = SessionLocal()
+ try:
+ # Validate control exists and is in reviewable state
+ result = db.execute(
+ text("SELECT id, release_state FROM canonical_controls WHERE control_id = :cid"),
+ {"cid": control_id},
+ )
+ row = result.fetchone()
+ if not row:
+ raise HTTPException(status_code=404, detail="Control not found")
+
+ current_state = row[1]
+ if current_state not in ("needs_review", "too_close", "duplicate"):
+ raise HTTPException(status_code=400, detail=f"Control is in state '{current_state}', not reviewable")
+
+ # Determine new state
+ if req.action == "approve":
+ new_state = req.release_state or "draft"
+ elif req.action == "reject":
+ new_state = "deprecated"
+ elif req.action == "needs_rework":
+ new_state = "needs_review"
+ else:
+ raise HTTPException(status_code=400, detail=f"Unknown action: {req.action}")
+
+ if new_state not in ("draft", "review", "approved", "deprecated", "needs_review", "too_close", "duplicate"):
+ raise HTTPException(status_code=400, detail=f"Invalid release_state: {new_state}")
+
+ db.execute(
+ text("""
+ UPDATE canonical_controls
+ SET release_state = :state, updated_at = NOW()
+ WHERE control_id = :cid
+ """),
+ {"state": new_state, "cid": control_id},
+ )
+ db.commit()
+
+ return {"control_id": control_id, "release_state": new_state, "action": req.action}
+ finally:
+ db.close()
+
+
+@router.get("/generate/processed-stats")
+async def get_processed_stats():
+ """Get processing statistics per collection."""
+ db = SessionLocal()
+ try:
+ result = db.execute(
+ text("""
+ SELECT
+ collection,
+ COUNT(*) as processed_chunks,
+ COUNT(*) FILTER (WHERE processing_path = 'structured') as direct_adopted,
+ COUNT(*) FILTER (WHERE processing_path = 'llm_reform') as llm_reformed,
+ COUNT(*) FILTER (WHERE processing_path = 'skipped') as skipped
+ FROM canonical_processed_chunks
+ GROUP BY collection
+ ORDER BY collection
+ """)
+ )
+ stats = []
+ cols = result.keys()
+ for row in result:
+ stat = dict(zip(cols, row))
+ stat["total_chunks_estimated"] = 0 # Would need Qdrant API to get total
+ stat["pending_chunks"] = 0
+ stats.append(stat)
+ return {"stats": stats}
+ finally:
+ db.close()
+
+
+# =============================================================================
+# BLOCKED SOURCES
+# =============================================================================
+
+@router.get("/blocked-sources")
+async def list_blocked_sources():
+ """List all blocked (Rule 3) sources."""
+ db = SessionLocal()
+ try:
+ result = db.execute(
+ text("""
+ SELECT id, regulation_code, document_title, reason,
+ deletion_status, qdrant_collection, marked_at
+ FROM canonical_blocked_sources
+ ORDER BY marked_at DESC
+ """)
+ )
+ sources = []
+ cols = result.keys()
+ for row in result:
+ src = dict(zip(cols, row))
+ src["id"] = str(src["id"])
+ src["marked_at"] = str(src["marked_at"])
+ sources.append(src)
+ return {"sources": sources}
+ finally:
+ db.close()
+
+
+@router.post("/blocked-sources/cleanup")
+async def start_cleanup():
+ """Start cleanup workflow for blocked sources.
+
+ This marks all pending blocked sources for deletion.
+ Actual RAG chunk deletion and file removal is a separate manual step.
+ """
+ db = SessionLocal()
+ try:
+ result = db.execute(
+ text("""
+ UPDATE canonical_blocked_sources
+ SET deletion_status = 'marked_for_deletion'
+ WHERE deletion_status = 'pending'
+ RETURNING regulation_code
+ """)
+ )
+ marked = [row[0] for row in result]
+ db.commit()
+
+ return {
+ "status": "marked_for_deletion",
+ "marked_count": len(marked),
+ "regulation_codes": marked,
+ "message": "Sources marked for deletion. Run manual cleanup to remove RAG chunks and files.",
+ }
+ finally:
+ db.close()
+
+
+# =============================================================================
+# CUSTOMER VIEW FILTER
+# =============================================================================
+
+@router.get("/controls-customer")
+async def get_controls_customer_view(
+ severity: str | None = Query(None),
+ domain: str | None = Query(None),
+):
+ """Get controls filtered for customer visibility.
+
+ Rule 3 controls have source_citation and source_original_text hidden.
+ generation_metadata is NEVER shown to customers.
+ """
+ db = SessionLocal()
+ try:
+ query = """
+ SELECT c.id, c.control_id, c.title, c.objective, c.rationale,
+ c.scope, c.requirements, c.test_procedure, c.evidence,
+ c.severity, c.risk_score, c.implementation_effort,
+ c.open_anchors, c.release_state, c.tags,
+ c.license_rule, c.customer_visible,
+ c.source_original_text, c.source_citation,
+ c.created_at, c.updated_at
+ FROM canonical_controls c
+ WHERE c.release_state IN ('draft', 'approved')
+ """
+ params: dict = {}
+
+ if severity:
+ query += " AND c.severity = :severity"
+ params["severity"] = severity
+ if domain:
+ query += " AND c.control_id LIKE :domain"
+ params["domain"] = f"{domain.upper()}-%"
+
+ query += " ORDER BY c.control_id"
+
+ result = db.execute(text(query), params)
+ controls = []
+ cols = result.keys()
+ for row in result:
+ ctrl = dict(zip(cols, row))
+ ctrl["id"] = str(ctrl["id"])
+ for key in ("created_at", "updated_at"):
+ if ctrl.get(key):
+ ctrl[key] = str(ctrl[key])
+
+ # Parse JSON fields
+ for jf in ("scope", "requirements", "test_procedure", "evidence",
+ "open_anchors", "tags", "source_citation"):
+ if isinstance(ctrl.get(jf), str):
+ try:
+ ctrl[jf] = json.loads(ctrl[jf])
+ except (json.JSONDecodeError, TypeError):
+ pass
+
+ # Customer visibility rules:
+ # - NEVER show generation_metadata
+ # - Rule 3: NEVER show source_citation or source_original_text
+ ctrl.pop("generation_metadata", None)
+ if not ctrl.get("customer_visible", True):
+ ctrl["source_citation"] = None
+ ctrl["source_original_text"] = None
+
+ controls.append(ctrl)
+
+ return {"controls": controls, "total": len(controls)}
+ finally:
+ db.close()
diff --git a/backend-compliance/compliance/services/anchor_finder.py b/backend-compliance/compliance/services/anchor_finder.py
new file mode 100644
index 0000000..f895e4b
--- /dev/null
+++ b/backend-compliance/compliance/services/anchor_finder.py
@@ -0,0 +1,188 @@
+"""
+Anchor Finder — finds open-source references (OWASP, NIST, ENISA) for controls.
+
+Two-stage search:
+ Stage A: RAG-internal search for open-source chunks matching the control topic
+ Stage B: Web search via DuckDuckGo Instant Answer API (no API key needed)
+
+Only open-source references (Rule 1+2) are accepted as anchors.
+"""
+
+from __future__ import annotations
+
+import logging
+from dataclasses import dataclass
+
+import httpx
+
+from .rag_client import ComplianceRAGClient, get_rag_client
+from .control_generator import (
+ GeneratedControl,
+ REGULATION_LICENSE_MAP,
+ _RULE2_PREFIXES,
+ _RULE3_PREFIXES,
+ _classify_regulation,
+)
+
+logger = logging.getLogger(__name__)
+
+# Regulation codes that are safe to reference as open anchors (Rule 1+2)
+_OPEN_SOURCE_RULES = {1, 2}
+
+
+@dataclass
+class OpenAnchor:
+ framework: str
+ ref: str
+ url: str
+
+
+class AnchorFinder:
+ """Finds open-source references to anchor generated controls."""
+
+ def __init__(self, rag_client: ComplianceRAGClient | None = None):
+ self.rag = rag_client or get_rag_client()
+
+ async def find_anchors(
+ self,
+ control: GeneratedControl,
+ skip_web: bool = False,
+ min_anchors: int = 2,
+ ) -> list[OpenAnchor]:
+ """Find open-source anchors for a control."""
+ # Stage A: RAG-internal search
+ anchors = await self._search_rag_for_open_anchors(control)
+
+ # Stage B: Web search if not enough anchors
+ if len(anchors) < min_anchors and not skip_web:
+ web_anchors = await self._search_web(control)
+ # Deduplicate by framework+ref
+ existing_keys = {(a.framework, a.ref) for a in anchors}
+ for wa in web_anchors:
+ if (wa.framework, wa.ref) not in existing_keys:
+ anchors.append(wa)
+
+ return anchors
+
+ async def _search_rag_for_open_anchors(self, control: GeneratedControl) -> list[OpenAnchor]:
+ """Search RAG for chunks from open sources matching the control topic."""
+ # Build search query from control title + first 3 tags
+ tags_str = " ".join(control.tags[:3]) if control.tags else ""
+ query = f"{control.title} {tags_str}".strip()
+
+ results = await self.rag.search(
+ query=query,
+ collection="bp_compliance_ce",
+ top_k=15,
+ )
+
+ anchors: list[OpenAnchor] = []
+ seen: set[str] = set()
+
+ for r in results:
+ if not r.regulation_code:
+ continue
+
+ # Only accept open-source references
+ license_info = _classify_regulation(r.regulation_code)
+ if license_info.get("rule") not in _OPEN_SOURCE_RULES:
+ continue
+
+ # Build reference key for dedup
+ ref = r.article or r.category or ""
+ key = f"{r.regulation_code}:{ref}"
+ if key in seen:
+ continue
+ seen.add(key)
+
+ framework_name = license_info.get("name", r.regulation_name or r.regulation_short or r.regulation_code)
+ url = r.source_url or self._build_reference_url(r.regulation_code, ref)
+
+ anchors.append(OpenAnchor(
+ framework=framework_name,
+ ref=ref,
+ url=url,
+ ))
+
+ if len(anchors) >= 5:
+ break
+
+ return anchors
+
+ async def _search_web(self, control: GeneratedControl) -> list[OpenAnchor]:
+ """Search DuckDuckGo Instant Answer API for open references."""
+ keywords = f"{control.title} security control OWASP NIST"
+ try:
+ async with httpx.AsyncClient(timeout=10.0) as client:
+ resp = await client.get(
+ "https://api.duckduckgo.com/",
+ params={
+ "q": keywords,
+ "format": "json",
+ "no_html": "1",
+ "skip_disambig": "1",
+ },
+ )
+ if resp.status_code != 200:
+ return []
+
+ data = resp.json()
+ anchors: list[OpenAnchor] = []
+
+ # Parse RelatedTopics
+ for topic in data.get("RelatedTopics", [])[:10]:
+ url = topic.get("FirstURL", "")
+ text = topic.get("Text", "")
+
+ if not url:
+ continue
+
+ # Only accept known open-source domains
+ framework = self._identify_framework_from_url(url)
+ if framework:
+ anchors.append(OpenAnchor(
+ framework=framework,
+ ref=text[:100] if text else url,
+ url=url,
+ ))
+
+ if len(anchors) >= 3:
+ break
+
+ return anchors
+
+ except Exception as e:
+ logger.warning("Web anchor search failed: %s", e)
+ return []
+
+ @staticmethod
+ def _identify_framework_from_url(url: str) -> str | None:
+ """Identify if a URL belongs to a known open-source framework."""
+ url_lower = url.lower()
+ if "owasp.org" in url_lower:
+ return "OWASP"
+ if "nist.gov" in url_lower or "csrc.nist.gov" in url_lower:
+ return "NIST"
+ if "enisa.europa.eu" in url_lower:
+ return "ENISA"
+ if "cisa.gov" in url_lower:
+ return "CISA"
+ if "eur-lex.europa.eu" in url_lower:
+ return "EU Law"
+ return None
+
+ @staticmethod
+ def _build_reference_url(regulation_code: str, ref: str) -> str:
+ """Build a reference URL for known frameworks."""
+ code = regulation_code.lower()
+ if code.startswith("owasp"):
+ return "https://owasp.org/www-project-application-security-verification-standard/"
+ if code.startswith("nist"):
+ return "https://csrc.nist.gov/publications"
+ if code.startswith("enisa"):
+ return "https://www.enisa.europa.eu/publications"
+ if code.startswith("eu_"):
+ return "https://eur-lex.europa.eu/"
+ if code == "cisa_secure_by_design":
+ return "https://www.cisa.gov/securebydesign"
+ return ""
diff --git a/backend-compliance/compliance/services/control_generator.py b/backend-compliance/compliance/services/control_generator.py
new file mode 100644
index 0000000..725bb59
--- /dev/null
+++ b/backend-compliance/compliance/services/control_generator.py
@@ -0,0 +1,951 @@
+"""
+Control Generator Pipeline — RAG → License → Structure/Reform → Harmonize → Anchor → Store.
+
+7-stage pipeline that generates canonical security controls from RAG chunks:
+ 1. RAG SCAN — Load unprocessed chunks (or new document versions)
+ 2. LICENSE CLASSIFY — Determine which of 3 license rules applies
+ 3a. STRUCTURE — Rule 1+2: Structure original text into control format
+ 3b. LLM REFORM — Rule 3: Fully reformulate (no original text, no source names)
+ 4. HARMONIZE — Check against existing controls for duplicates
+ 5. ANCHOR SEARCH — Find open-source references (OWASP, NIST, ENISA)
+ 6. STORE — Persist to DB with correct visibility flags
+ 7. MARK PROCESSED — Mark RAG chunks as processed (with version tracking)
+
+Three License Rules:
+ Rule 1 (free_use): Laws, Public Domain — original text allowed
+ Rule 2 (citation_required): CC-BY, CC-BY-SA — original text with citation
+ Rule 3 (restricted): BSI, ISO — full reformulation, no source names
+"""
+
+from __future__ import annotations
+
+import hashlib
+import json
+import logging
+import os
+import re
+import uuid
+from dataclasses import dataclass, field, asdict
+from datetime import datetime, timezone
+from typing import Optional
+
+import httpx
+from pydantic import BaseModel
+from sqlalchemy import text
+from sqlalchemy.orm import Session
+
+from .rag_client import ComplianceRAGClient, RAGSearchResult, get_rag_client
+from .similarity_detector import check_similarity, SimilarityReport
+
+logger = logging.getLogger(__name__)
+
+# ---------------------------------------------------------------------------
+# Configuration
+# ---------------------------------------------------------------------------
+
+SDK_URL = os.getenv("SDK_URL", "http://ai-compliance-sdk:8090")
+LLM_CHAT_URL = f"{SDK_URL}/sdk/v1/llm/chat"
+EMBEDDING_URL = os.getenv("EMBEDDING_URL", "http://embedding-service:8087")
+LLM_MODEL = os.getenv("CONTROL_GEN_LLM_MODEL", "qwen3:30b-a3b")
+LLM_TIMEOUT = float(os.getenv("CONTROL_GEN_LLM_TIMEOUT", "120"))
+
+HARMONIZATION_THRESHOLD = 0.85 # Cosine similarity above this = duplicate
+
+ALL_COLLECTIONS = [
+ "bp_compliance_ce",
+ "bp_compliance_recht",
+ "bp_compliance_gesetze",
+ "bp_compliance_datenschutz",
+ "bp_dsfa_corpus",
+ "bp_legal_templates",
+]
+
+# ---------------------------------------------------------------------------
+# License Mapping (3-Rule System)
+# ---------------------------------------------------------------------------
+
+REGULATION_LICENSE_MAP: dict[str, dict] = {
+ # RULE 1: FREE USE — Laws, Public Domain
+ "eu_2016_679": {"license": "EU_LAW", "rule": 1, "name": "DSGVO"},
+ "eu_2024_1689": {"license": "EU_LAW", "rule": 1, "name": "AI Act"},
+ "eu_2022_2555": {"license": "EU_LAW", "rule": 1, "name": "NIS2"},
+ "eu_2024_2847": {"license": "EU_LAW", "rule": 1, "name": "CRA"},
+ "eu_2023_1230": {"license": "EU_LAW", "rule": 1, "name": "Maschinenverordnung"},
+ "nist_sp_800_53": {"license": "NIST_PUBLIC_DOMAIN", "rule": 1, "name": "NIST SP 800-53"},
+ "nist_sp_800_63b": {"license": "NIST_PUBLIC_DOMAIN", "rule": 1, "name": "NIST SP 800-63B"},
+ "nist_csf_2_0": {"license": "NIST_PUBLIC_DOMAIN", "rule": 1, "name": "NIST CSF 2.0"},
+ "nist_sp_800_218": {"license": "NIST_PUBLIC_DOMAIN", "rule": 1, "name": "NIST SSDF"},
+ "cisa_secure_by_design": {"license": "US_GOV_PUBLIC", "rule": 1, "name": "CISA Secure by Design"},
+ "bdsg": {"license": "DE_LAW", "rule": 1, "name": "BDSG"},
+ "ttdsg": {"license": "DE_LAW", "rule": 1, "name": "TTDSG"},
+ "tkg": {"license": "DE_LAW", "rule": 1, "name": "TKG"},
+
+ # RULE 2: CITATION REQUIRED — CC-BY, CC-BY-SA
+ "owasp_asvs": {"license": "CC-BY-SA-4.0", "rule": 2, "name": "OWASP ASVS",
+ "attribution": "OWASP Foundation, CC BY-SA 4.0"},
+ "owasp_masvs": {"license": "CC-BY-SA-4.0", "rule": 2, "name": "OWASP MASVS",
+ "attribution": "OWASP Foundation, CC BY-SA 4.0"},
+ "owasp_top10": {"license": "CC-BY-SA-4.0", "rule": 2, "name": "OWASP Top 10",
+ "attribution": "OWASP Foundation, CC BY-SA 4.0"},
+ "oecd_ai_principles": {"license": "OECD_PUBLIC", "rule": 2, "name": "OECD AI Principles",
+ "attribution": "OECD"},
+
+ # RULE 3: RESTRICTED — Full reformulation required
+ # Names stored as INTERNAL_ONLY — never exposed to customers
+}
+
+# Prefix-based matching for wildcard entries
+_RULE3_PREFIXES = ["bsi_", "iso_", "etsi_"]
+_RULE2_PREFIXES = ["enisa_"]
+
+
+def _classify_regulation(regulation_code: str) -> dict:
+ """Determine license rule for a regulation_code."""
+ code = regulation_code.lower().strip()
+
+ # Exact match first
+ if code in REGULATION_LICENSE_MAP:
+ return REGULATION_LICENSE_MAP[code]
+
+ # Prefix match for Rule 2
+ for prefix in _RULE2_PREFIXES:
+ if code.startswith(prefix):
+ return {"license": "CC-BY-4.0", "rule": 2, "name": "ENISA",
+ "attribution": "ENISA, CC BY 4.0"}
+
+ # Prefix match for Rule 3
+ for prefix in _RULE3_PREFIXES:
+ if code.startswith(prefix):
+ return {"license": f"{prefix.rstrip('_').upper()}_RESTRICTED", "rule": 3,
+ "name": "INTERNAL_ONLY"}
+
+ # Unknown → treat as restricted (safe default)
+ logger.warning("Unknown regulation_code %r — defaulting to Rule 3 (restricted)", code)
+ return {"license": "UNKNOWN", "rule": 3, "name": "INTERNAL_ONLY"}
+
+
+# ---------------------------------------------------------------------------
+# Domain detection from content
+# ---------------------------------------------------------------------------
+
+DOMAIN_KEYWORDS = {
+ "AUTH": ["authentication", "login", "password", "credential", "mfa", "2fa",
+ "session", "token", "oauth", "identity", "authentifizierung", "anmeldung"],
+ "CRYPT": ["encryption", "cryptography", "tls", "ssl", "certificate", "hashing",
+ "aes", "rsa", "verschlüsselung", "kryptographie", "zertifikat"],
+ "NET": ["network", "firewall", "dns", "vpn", "proxy", "segmentation",
+ "netzwerk", "routing", "port", "intrusion"],
+ "DATA": ["data protection", "privacy", "personal data", "datenschutz",
+ "personenbezogen", "dsgvo", "gdpr", "löschung", "verarbeitung"],
+ "LOG": ["logging", "monitoring", "audit", "siem", "alert", "anomaly",
+ "protokollierung", "überwachung"],
+ "ACC": ["access control", "authorization", "rbac", "permission", "privilege",
+ "zugriffskontrolle", "berechtigung", "autorisierung"],
+ "SEC": ["vulnerability", "patch", "update", "hardening", "configuration",
+ "schwachstelle", "härtung", "konfiguration"],
+ "INC": ["incident", "response", "breach", "recovery", "backup",
+ "vorfall", "wiederherstellung", "notfall"],
+ "AI": ["artificial intelligence", "machine learning", "model", "bias",
+ "ki", "künstliche intelligenz", "algorithmus", "training"],
+ "COMP": ["compliance", "audit", "regulation", "standard", "certification",
+ "konformität", "prüfung", "zertifizierung"],
+}
+
+
+def _detect_domain(text: str) -> str:
+ """Detect the most likely domain from text content."""
+ text_lower = text.lower()
+ scores: dict[str, int] = {}
+ for domain, keywords in DOMAIN_KEYWORDS.items():
+ scores[domain] = sum(1 for kw in keywords if kw in text_lower)
+ if not scores or max(scores.values()) == 0:
+ return "SEC" # Default
+ return max(scores, key=scores.get)
+
+
+# ---------------------------------------------------------------------------
+# Data Models
+# ---------------------------------------------------------------------------
+
+class GeneratorConfig(BaseModel):
+ collections: list[str] | None = None
+ domain: str | None = None
+ batch_size: int = 5
+ max_controls: int = 50
+ skip_processed: bool = True
+ skip_web_search: bool = False
+ dry_run: bool = False
+
+
+@dataclass
+class GeneratedControl:
+ control_id: str = ""
+ title: str = ""
+ objective: str = ""
+ rationale: str = ""
+ scope: dict = field(default_factory=dict)
+ requirements: list = field(default_factory=list)
+ test_procedure: list = field(default_factory=list)
+ evidence: list = field(default_factory=list)
+ severity: str = "medium"
+ risk_score: float = 5.0
+ implementation_effort: str = "m"
+ open_anchors: list = field(default_factory=list)
+ release_state: str = "draft"
+ tags: list = field(default_factory=list)
+ # 3-rule fields
+ license_rule: int | None = None
+ source_original_text: str | None = None
+ source_citation: dict | None = None
+ customer_visible: bool = True
+ generation_metadata: dict = field(default_factory=dict)
+
+
+@dataclass
+class GeneratorResult:
+ job_id: str = ""
+ status: str = "completed"
+ total_chunks_scanned: int = 0
+ controls_generated: int = 0
+ controls_verified: int = 0
+ controls_needs_review: int = 0
+ controls_too_close: int = 0
+ controls_duplicates_found: int = 0
+ errors: list = field(default_factory=list)
+ controls: list = field(default_factory=list)
+
+
+# ---------------------------------------------------------------------------
+# LLM Client (via Go SDK)
+# ---------------------------------------------------------------------------
+
+async def _llm_chat(prompt: str, system_prompt: str | None = None) -> str:
+ """Call the Go SDK LLM chat endpoint."""
+ messages = []
+ if system_prompt:
+ messages.append({"role": "system", "content": system_prompt})
+ messages.append({"role": "user", "content": prompt})
+
+ payload = {
+ "model": LLM_MODEL,
+ "messages": messages,
+ "stream": False,
+ }
+
+ try:
+ async with httpx.AsyncClient(timeout=LLM_TIMEOUT) as client:
+ resp = await client.post(LLM_CHAT_URL, json=payload)
+ if resp.status_code != 200:
+ logger.error("LLM chat failed %d: %s", resp.status_code, resp.text[:300])
+ return ""
+ data = resp.json()
+ # Go SDK returns {message: {content: "..."}} or {response: "..."}
+ msg = data.get("message", {})
+ if isinstance(msg, dict):
+ return msg.get("content", "")
+ return data.get("response", str(msg))
+ except Exception as e:
+ logger.error("LLM chat request failed: %s", e)
+ return ""
+
+
+async def _get_embedding(text: str) -> list[float]:
+ """Get embedding vector for text via embedding service."""
+ try:
+ async with httpx.AsyncClient(timeout=10.0) as client:
+ resp = await client.post(
+ f"{EMBEDDING_URL}/embed",
+ json={"texts": [text]},
+ )
+ resp.raise_for_status()
+ embeddings = resp.json().get("embeddings", [])
+ return embeddings[0] if embeddings else []
+ except Exception:
+ return []
+
+
+def _cosine_sim(a: list[float], b: list[float]) -> float:
+ """Compute cosine similarity between two vectors."""
+ if not a or not b or len(a) != len(b):
+ return 0.0
+ dot = sum(x * y for x, y in zip(a, b))
+ norm_a = sum(x * x for x in a) ** 0.5
+ norm_b = sum(x * x for x in b) ** 0.5
+ if norm_a == 0 or norm_b == 0:
+ return 0.0
+ return dot / (norm_a * norm_b)
+
+
+# ---------------------------------------------------------------------------
+# JSON Parsing Helper
+# ---------------------------------------------------------------------------
+
+def _parse_llm_json(raw: str) -> dict:
+ """Extract JSON from LLM response (handles markdown fences)."""
+ # Try extracting from ```json ... ``` blocks
+ match = re.search(r"```(?:json)?\s*\n?(.*?)\n?```", raw, re.DOTALL)
+ text = match.group(1) if match else raw
+
+ # Try parsing directly
+ try:
+ return json.loads(text)
+ except json.JSONDecodeError:
+ pass
+
+ # Try finding first { ... } block
+ brace_match = re.search(r"\{.*\}", text, re.DOTALL)
+ if brace_match:
+ try:
+ return json.loads(brace_match.group(0))
+ except json.JSONDecodeError:
+ pass
+
+ logger.warning("Failed to parse LLM JSON response")
+ return {}
+
+
+# ---------------------------------------------------------------------------
+# Pipeline
+# ---------------------------------------------------------------------------
+
+REFORM_SYSTEM_PROMPT = """Du bist ein Security-Compliance-Experte. Deine Aufgabe ist es, eigenständige
+Security Controls zu formulieren. Du formulierst IMMER in eigenen Worten.
+KOPIERE KEINE Sätze aus dem Quelltext. Verwende eigene Begriffe und Struktur.
+NENNE NICHT die Quelle. Keine proprietären Bezeichner.
+Antworte NUR mit validem JSON."""
+
+STRUCTURE_SYSTEM_PROMPT = """Du bist ein Security-Compliance-Experte. Strukturiere den gegebenen Text
+als praxisorientiertes Security Control. Erstelle eine verständliche, umsetzbare Formulierung.
+Antworte NUR mit validem JSON."""
+
+
+class ControlGeneratorPipeline:
+ """Orchestrates the 7-stage control generation pipeline."""
+
+ def __init__(self, db: Session, rag_client: ComplianceRAGClient | None = None):
+ self.db = db
+ self.rag = rag_client or get_rag_client()
+ self._existing_controls: list[dict] | None = None
+ self._existing_embeddings: dict[str, list[float]] = {}
+
+ # ── Stage 1: RAG Scan ──────────────────────────────────────────────
+
+ async def _scan_rag(self, config: GeneratorConfig) -> list[RAGSearchResult]:
+ """Load unprocessed chunks from RAG collections."""
+ collections = config.collections or ALL_COLLECTIONS
+ all_results: list[RAGSearchResult] = []
+
+ queries = [
+ "security requirement control measure",
+ "Sicherheitsanforderung Maßnahme Prüfaspekt",
+ "compliance requirement audit criterion",
+ "data protection privacy obligation",
+ "access control authentication authorization",
+ ]
+
+ if config.domain:
+ domain_kw = DOMAIN_KEYWORDS.get(config.domain, [])
+ if domain_kw:
+ queries.append(" ".join(domain_kw[:5]))
+
+ for collection in collections:
+ for query in queries:
+ results = await self.rag.search(
+ query=query,
+ collection=collection,
+ top_k=20,
+ )
+ all_results.extend(results)
+
+ # Deduplicate by text hash
+ seen_hashes: set[str] = set()
+ unique: list[RAGSearchResult] = []
+ for r in all_results:
+ h = hashlib.sha256(r.text.encode()).hexdigest()
+ if h not in seen_hashes:
+ seen_hashes.add(h)
+ unique.append(r)
+
+ # Filter out already-processed chunks
+ if config.skip_processed and unique:
+ hashes = [hashlib.sha256(r.text.encode()).hexdigest() for r in unique]
+ processed = self._get_processed_hashes(hashes)
+ unique = [r for r, h in zip(unique, hashes) if h not in processed]
+
+ logger.info("RAG scan: %d unique chunks (%d after filtering processed)",
+ len(seen_hashes), len(unique))
+ return unique[:config.max_controls * 3] # Over-fetch to account for duplicates
+
+ def _get_processed_hashes(self, hashes: list[str]) -> set[str]:
+ """Check which chunk hashes are already processed."""
+ if not hashes:
+ return set()
+ try:
+ result = self.db.execute(
+ text("SELECT chunk_hash FROM canonical_processed_chunks WHERE chunk_hash = ANY(:hashes)"),
+ {"hashes": hashes},
+ )
+ return {row[0] for row in result}
+ except Exception as e:
+ logger.warning("Error checking processed chunks: %s", e)
+ return set()
+
+ # ── Stage 2: License Classification ────────────────────────────────
+
+ def _classify_license(self, chunk: RAGSearchResult) -> dict:
+ """Determine which license rule applies to this chunk."""
+ return _classify_regulation(chunk.regulation_code)
+
+ # ── Stage 3a: Structure (Rule 1 — Free Use) ───────────────────────
+
+ async def _structure_free_use(self, chunk: RAGSearchResult, license_info: dict) -> GeneratedControl:
+ """Structure a freely usable text into control format."""
+ source_name = license_info.get("name", chunk.regulation_name)
+ prompt = f"""Strukturiere den folgenden Gesetzestext als Security/Compliance Control.
+Du DARFST den Originaltext verwenden (Quelle: {source_name}, {license_info.get('license', '')}).
+
+WICHTIG: Erstelle eine verständliche, praxisorientierte Formulierung.
+Der Originaltext wird separat gespeichert — deine Formulierung soll klar und umsetzbar sein.
+
+Gib JSON zurück mit diesen Feldern:
+- title: Kurzer prägnanter Titel (max 100 Zeichen)
+- objective: Was soll erreicht werden? (1-3 Sätze)
+- rationale: Warum ist das wichtig? (1-2 Sätze)
+- requirements: Liste von konkreten Anforderungen (Strings)
+- test_procedure: Liste von Prüfschritten (Strings)
+- evidence: Liste von Nachweisdokumenten (Strings)
+- severity: low/medium/high/critical
+- tags: Liste von Tags
+
+Text: {chunk.text[:2000]}
+Quelle: {chunk.regulation_name} ({chunk.regulation_code}), {chunk.article}"""
+
+ raw = await _llm_chat(prompt, STRUCTURE_SYSTEM_PROMPT)
+ data = _parse_llm_json(raw)
+ if not data:
+ return self._fallback_control(chunk)
+
+ domain = _detect_domain(chunk.text)
+ control = self._build_control_from_json(data, domain)
+ control.license_rule = 1
+ control.source_original_text = chunk.text
+ control.source_citation = {
+ "source": f"{chunk.regulation_name} {chunk.article or ''}".strip(),
+ "license": license_info.get("license", ""),
+ "url": chunk.source_url or "",
+ }
+ control.customer_visible = True
+ control.generation_metadata = {
+ "processing_path": "structured",
+ "license_rule": 1,
+ "source_regulation": chunk.regulation_code,
+ "source_article": chunk.article,
+ }
+ return control
+
+ # ── Stage 3b: Structure with Citation (Rule 2) ────────────────────
+
+ async def _structure_with_citation(self, chunk: RAGSearchResult, license_info: dict) -> GeneratedControl:
+ """Structure text that requires citation."""
+ source_name = license_info.get("name", chunk.regulation_name)
+ attribution = license_info.get("attribution", "")
+ prompt = f"""Strukturiere den folgenden Text als Security Control.
+Quelle: {source_name} ({license_info.get('license', '')}) — Zitation erforderlich.
+
+Du darfst den Text übernehmen oder verständlicher umformulieren.
+Die Quelle wird automatisch zitiert — fokussiere dich auf Klarheit.
+
+Gib JSON zurück mit diesen Feldern:
+- title: Kurzer prägnanter Titel (max 100 Zeichen)
+- objective: Was soll erreicht werden? (1-3 Sätze)
+- rationale: Warum ist das wichtig? (1-2 Sätze)
+- requirements: Liste von konkreten Anforderungen (Strings)
+- test_procedure: Liste von Prüfschritten (Strings)
+- evidence: Liste von Nachweisdokumenten (Strings)
+- severity: low/medium/high/critical
+- tags: Liste von Tags
+
+Text: {chunk.text[:2000]}
+Quelle: {chunk.regulation_name}, {chunk.article}"""
+
+ raw = await _llm_chat(prompt, STRUCTURE_SYSTEM_PROMPT)
+ data = _parse_llm_json(raw)
+ if not data:
+ return self._fallback_control(chunk)
+
+ domain = _detect_domain(chunk.text)
+ control = self._build_control_from_json(data, domain)
+ control.license_rule = 2
+ control.source_original_text = chunk.text
+ control.source_citation = {
+ "source": f"{chunk.regulation_name} {chunk.article or ''}".strip(),
+ "license": license_info.get("license", ""),
+ "license_notice": attribution,
+ "url": chunk.source_url or "",
+ }
+ control.customer_visible = True
+ control.generation_metadata = {
+ "processing_path": "structured",
+ "license_rule": 2,
+ "source_regulation": chunk.regulation_code,
+ "source_article": chunk.article,
+ }
+ return control
+
+ # ── Stage 3c: LLM Reformulation (Rule 3 — Restricted) ─────────────
+
+ async def _llm_reformulate(self, chunk: RAGSearchResult, config: GeneratorConfig) -> GeneratedControl:
+ """Fully reformulate — NO original text, NO source names."""
+ domain = config.domain or _detect_domain(chunk.text)
+ prompt = f"""Analysiere den folgenden Prüfaspekt und formuliere ein EIGENSTÄNDIGES Security Control.
+KOPIERE KEINE Sätze. Verwende eigene Begriffe und Struktur.
+NENNE NICHT die Quelle. Keine proprietären Bezeichner (kein O.Auth_*, TR-03161, BSI-TR etc.).
+
+Aspekt (nur zur Analyse, NICHT kopieren, NICHT referenzieren):
+---
+{chunk.text[:1500]}
+---
+
+Domain: {domain}
+
+Gib JSON zurück mit diesen Feldern:
+- title: Kurzer eigenständiger Titel (max 100 Zeichen)
+- objective: Eigenständige Formulierung des Ziels (1-3 Sätze)
+- rationale: Eigenständige Begründung (1-2 Sätze)
+- requirements: Liste von konkreten Anforderungen (Strings, eigene Worte)
+- test_procedure: Liste von Prüfschritten (Strings)
+- evidence: Liste von Nachweisdokumenten (Strings)
+- severity: low/medium/high/critical
+- tags: Liste von Tags (eigene Begriffe)"""
+
+ raw = await _llm_chat(prompt, REFORM_SYSTEM_PROMPT)
+ data = _parse_llm_json(raw)
+ if not data:
+ return self._fallback_control(chunk)
+
+ control = self._build_control_from_json(data, domain)
+ control.license_rule = 3
+ control.source_original_text = None # NEVER store original
+ control.source_citation = None # NEVER cite source
+ control.customer_visible = False # Only our formulation
+ # generation_metadata: NO source names, NO original texts
+ control.generation_metadata = {
+ "processing_path": "llm_reform",
+ "license_rule": 3,
+ }
+ return control
+
+ # ── Stage 4: Harmonization ─────────────────────────────────────────
+
+ async def _check_harmonization(self, new_control: GeneratedControl) -> list | None:
+ """Check if a new control duplicates existing ones via embedding similarity."""
+ existing = self._load_existing_controls()
+ if not existing:
+ return None
+
+ new_text = f"{new_control.title} {new_control.objective}"
+ new_emb = await _get_embedding(new_text)
+ if not new_emb:
+ return None
+
+ similar = []
+ for ex in existing:
+ ex_key = ex.get("control_id", "")
+ ex_text = f"{ex.get('title', '')} {ex.get('objective', '')}"
+
+ # Get or compute embedding for existing control
+ if ex_key not in self._existing_embeddings:
+ emb = await _get_embedding(ex_text)
+ self._existing_embeddings[ex_key] = emb
+ ex_emb = self._existing_embeddings.get(ex_key, [])
+
+ if not ex_emb:
+ continue
+
+ cosine = _cosine_sim(new_emb, ex_emb)
+ if cosine > HARMONIZATION_THRESHOLD:
+ similar.append({
+ "control_id": ex.get("control_id", ""),
+ "title": ex.get("title", ""),
+ "similarity": round(cosine, 3),
+ })
+
+ return similar if similar else None
+
+ def _load_existing_controls(self) -> list[dict]:
+ """Load existing controls from DB (cached per pipeline run)."""
+ if self._existing_controls is not None:
+ return self._existing_controls
+
+ try:
+ result = self.db.execute(
+ text("SELECT control_id, title, objective FROM canonical_controls WHERE release_state != 'deprecated'")
+ )
+ self._existing_controls = [
+ {"control_id": r[0], "title": r[1], "objective": r[2]}
+ for r in result
+ ]
+ except Exception as e:
+ logger.warning("Error loading existing controls: %s", e)
+ self._existing_controls = []
+
+ return self._existing_controls
+
+ # ── Helpers ────────────────────────────────────────────────────────
+
+ def _build_control_from_json(self, data: dict, domain: str) -> GeneratedControl:
+ """Build a GeneratedControl from parsed LLM JSON."""
+ severity = data.get("severity", "medium")
+ if severity not in ("low", "medium", "high", "critical"):
+ severity = "medium"
+
+ tags = data.get("tags", [])
+ if isinstance(tags, str):
+ tags = [t.strip() for t in tags.split(",")]
+
+ return GeneratedControl(
+ title=str(data.get("title", "Untitled Control"))[:255],
+ objective=str(data.get("objective", "")),
+ rationale=str(data.get("rationale", "")),
+ scope=data.get("scope", {}),
+ requirements=data.get("requirements", []) if isinstance(data.get("requirements"), list) else [],
+ test_procedure=data.get("test_procedure", []) if isinstance(data.get("test_procedure"), list) else [],
+ evidence=data.get("evidence", []) if isinstance(data.get("evidence"), list) else [],
+ severity=severity,
+ risk_score=min(10.0, max(0.0, float(data.get("risk_score", 5.0)))),
+ implementation_effort=data.get("implementation_effort", "m") if data.get("implementation_effort") in ("s", "m", "l", "xl") else "m",
+ tags=tags[:20],
+ )
+
+ def _fallback_control(self, chunk: RAGSearchResult) -> GeneratedControl:
+ """Create a minimal control when LLM parsing fails."""
+ domain = _detect_domain(chunk.text)
+ return GeneratedControl(
+ title=f"Control from {chunk.regulation_code} {chunk.article or ''}".strip()[:255],
+ objective=chunk.text[:500] if chunk.text else "Needs manual review",
+ rationale="Auto-generated — LLM parsing failed, manual review required.",
+ severity="medium",
+ release_state="needs_review",
+ tags=[domain.lower()],
+ )
+
+ def _generate_control_id(self, domain: str, db: Session) -> str:
+ """Generate next sequential control ID like AUTH-011."""
+ prefix = domain.upper()[:4]
+ try:
+ result = db.execute(
+ text("SELECT control_id FROM canonical_controls WHERE control_id LIKE :prefix ORDER BY control_id DESC LIMIT 1"),
+ {"prefix": f"{prefix}-%"},
+ )
+ row = result.fetchone()
+ if row:
+ last_num = int(row[0].split("-")[-1])
+ return f"{prefix}-{last_num + 1:03d}"
+ except Exception:
+ pass
+ return f"{prefix}-001"
+
+ # ── Pipeline Orchestration ─────────────────────────────────────────
+
+ def _create_job(self, config: GeneratorConfig) -> str:
+ """Create a generation job record."""
+ try:
+ result = self.db.execute(
+ text("""
+ INSERT INTO canonical_generation_jobs (status, config)
+ VALUES ('running', :config)
+ RETURNING id
+ """),
+ {"config": json.dumps(config.model_dump())},
+ )
+ self.db.commit()
+ row = result.fetchone()
+ return str(row[0]) if row else str(uuid.uuid4())
+ except Exception as e:
+ logger.error("Failed to create job: %s", e)
+ return str(uuid.uuid4())
+
+ def _update_job(self, job_id: str, result: GeneratorResult):
+ """Update job with final stats."""
+ try:
+ self.db.execute(
+ text("""
+ UPDATE canonical_generation_jobs
+ SET status = :status,
+ total_chunks_scanned = :scanned,
+ controls_generated = :generated,
+ controls_verified = :verified,
+ controls_needs_review = :needs_review,
+ controls_too_close = :too_close,
+ controls_duplicates_found = :duplicates,
+ errors = :errors,
+ completed_at = NOW()
+ WHERE id = :job_id::uuid
+ """),
+ {
+ "job_id": job_id,
+ "status": result.status,
+ "scanned": result.total_chunks_scanned,
+ "generated": result.controls_generated,
+ "verified": result.controls_verified,
+ "needs_review": result.controls_needs_review,
+ "too_close": result.controls_too_close,
+ "duplicates": result.controls_duplicates_found,
+ "errors": json.dumps(result.errors[-50:]),
+ },
+ )
+ self.db.commit()
+ except Exception as e:
+ logger.error("Failed to update job: %s", e)
+
+ def _store_control(self, control: GeneratedControl, job_id: str) -> str | None:
+ """Persist a generated control to DB. Returns the control UUID or None."""
+ try:
+ # Get framework UUID
+ fw_result = self.db.execute(
+ text("SELECT id FROM canonical_control_frameworks WHERE framework_id = 'bp_security_v1' LIMIT 1")
+ )
+ fw_row = fw_result.fetchone()
+ if not fw_row:
+ logger.error("Framework bp_security_v1 not found")
+ return None
+ framework_uuid = fw_row[0]
+
+ # Generate control_id if not set
+ if not control.control_id:
+ domain = _detect_domain(control.objective) if control.objective else "SEC"
+ control.control_id = self._generate_control_id(domain, self.db)
+
+ result = self.db.execute(
+ text("""
+ INSERT INTO canonical_controls (
+ framework_id, control_id, title, objective, rationale,
+ scope, requirements, test_procedure, evidence,
+ severity, risk_score, implementation_effort,
+ open_anchors, release_state, tags,
+ license_rule, source_original_text, source_citation,
+ customer_visible, generation_metadata
+ ) VALUES (
+ :framework_id, :control_id, :title, :objective, :rationale,
+ :scope, :requirements, :test_procedure, :evidence,
+ :severity, :risk_score, :implementation_effort,
+ :open_anchors, :release_state, :tags,
+ :license_rule, :source_original_text, :source_citation,
+ :customer_visible, :generation_metadata
+ )
+ ON CONFLICT (framework_id, control_id) DO NOTHING
+ RETURNING id
+ """),
+ {
+ "framework_id": framework_uuid,
+ "control_id": control.control_id,
+ "title": control.title,
+ "objective": control.objective,
+ "rationale": control.rationale,
+ "scope": json.dumps(control.scope),
+ "requirements": json.dumps(control.requirements),
+ "test_procedure": json.dumps(control.test_procedure),
+ "evidence": json.dumps(control.evidence),
+ "severity": control.severity,
+ "risk_score": control.risk_score,
+ "implementation_effort": control.implementation_effort,
+ "open_anchors": json.dumps(control.open_anchors),
+ "release_state": control.release_state,
+ "tags": json.dumps(control.tags),
+ "license_rule": control.license_rule,
+ "source_original_text": control.source_original_text,
+ "source_citation": json.dumps(control.source_citation) if control.source_citation else None,
+ "customer_visible": control.customer_visible,
+ "generation_metadata": json.dumps(control.generation_metadata) if control.generation_metadata else None,
+ },
+ )
+ self.db.commit()
+ row = result.fetchone()
+ return str(row[0]) if row else None
+ except Exception as e:
+ logger.error("Failed to store control %s: %s", control.control_id, e)
+ self.db.rollback()
+ return None
+
+ def _mark_chunk_processed(
+ self,
+ chunk: RAGSearchResult,
+ license_info: dict,
+ processing_path: str,
+ control_ids: list[str],
+ job_id: str,
+ ):
+ """Mark a RAG chunk as processed (Stage 7)."""
+ chunk_hash = hashlib.sha256(chunk.text.encode()).hexdigest()
+ try:
+ self.db.execute(
+ text("""
+ INSERT INTO canonical_processed_chunks (
+ chunk_hash, collection, regulation_code,
+ document_version, source_license, license_rule,
+ processing_path, generated_control_ids, job_id
+ ) VALUES (
+ :hash, :collection, :regulation_code,
+ :doc_version, :license, :rule,
+ :path, :control_ids, :job_id::uuid
+ )
+ ON CONFLICT (chunk_hash, collection, document_version) DO NOTHING
+ """),
+ {
+ "hash": chunk_hash,
+ "collection": "bp_compliance_ce", # Default, we don't track collection per result
+ "regulation_code": chunk.regulation_code,
+ "doc_version": "1.0",
+ "license": license_info.get("license", ""),
+ "rule": license_info.get("rule", 3),
+ "path": processing_path,
+ "control_ids": json.dumps(control_ids),
+ "job_id": job_id,
+ },
+ )
+ self.db.commit()
+ except Exception as e:
+ logger.warning("Failed to mark chunk processed: %s", e)
+
+ # ── Main Pipeline ──────────────────────────────────────────────────
+
+ async def run(self, config: GeneratorConfig) -> GeneratorResult:
+ """Execute the full 7-stage pipeline."""
+ result = GeneratorResult()
+
+ # Create job
+ job_id = self._create_job(config)
+ result.job_id = job_id
+
+ try:
+ # Stage 1: RAG Scan
+ chunks = await self._scan_rag(config)
+ result.total_chunks_scanned = len(chunks)
+
+ if not chunks:
+ result.status = "completed"
+ self._update_job(job_id, result)
+ return result
+
+ # Process chunks
+ controls_count = 0
+ for chunk in chunks:
+ if controls_count >= config.max_controls:
+ break
+
+ try:
+ control = await self._process_single_chunk(chunk, config, job_id)
+ if control is None:
+ continue
+
+ # Count by state
+ if control.release_state == "too_close":
+ result.controls_too_close += 1
+ elif control.release_state == "duplicate":
+ result.controls_duplicates_found += 1
+ elif control.release_state == "needs_review":
+ result.controls_needs_review += 1
+ else:
+ result.controls_verified += 1
+
+ # Store (unless dry run)
+ if not config.dry_run:
+ ctrl_uuid = self._store_control(control, job_id)
+ if ctrl_uuid:
+ # Stage 7: Mark chunk processed
+ license_info = self._classify_license(chunk)
+ path = "llm_reform" if license_info["rule"] == 3 else "structured"
+ self._mark_chunk_processed(chunk, license_info, path, [ctrl_uuid], job_id)
+
+ result.controls_generated += 1
+ result.controls.append(asdict(control))
+ controls_count += 1
+
+ # Add to existing controls for harmonization of next chunks
+ if self._existing_controls is not None:
+ self._existing_controls.append({
+ "control_id": control.control_id,
+ "title": control.title,
+ "objective": control.objective,
+ })
+
+ except Exception as e:
+ error_msg = f"Error processing chunk {chunk.regulation_code}/{chunk.article}: {e}"
+ logger.error(error_msg)
+ result.errors.append(error_msg)
+
+ result.status = "completed"
+
+ except Exception as e:
+ result.status = "failed"
+ result.errors.append(str(e))
+ logger.error("Pipeline failed: %s", e)
+
+ self._update_job(job_id, result)
+ return result
+
+ async def _process_single_chunk(
+ self,
+ chunk: RAGSearchResult,
+ config: GeneratorConfig,
+ job_id: str,
+ ) -> GeneratedControl | None:
+ """Process a single chunk through stages 2-5."""
+ # Stage 2: License classification
+ license_info = self._classify_license(chunk)
+
+ # Stage 3: Structure or Reform based on rule
+ if license_info["rule"] == 1:
+ control = await self._structure_free_use(chunk, license_info)
+ elif license_info["rule"] == 2:
+ control = await self._structure_with_citation(chunk, license_info)
+ else:
+ control = await self._llm_reformulate(chunk, config)
+
+ # Too-Close-Check for Rule 3
+ similarity = await check_similarity(chunk.text, f"{control.objective} {control.rationale}")
+ if similarity.status == "FAIL":
+ control.release_state = "too_close"
+ control.generation_metadata["similarity_status"] = "FAIL"
+ control.generation_metadata["similarity_scores"] = {
+ "token_overlap": similarity.token_overlap,
+ "ngram_jaccard": similarity.ngram_jaccard,
+ "lcs_ratio": similarity.lcs_ratio,
+ }
+ return control
+
+ if not control.title or not control.objective:
+ return None
+
+ # Stage 4: Harmonization
+ duplicates = await self._check_harmonization(control)
+ if duplicates:
+ control.release_state = "duplicate"
+ control.generation_metadata["similar_controls"] = duplicates
+ return control
+
+ # Stage 5: Anchor Search (imported from anchor_finder)
+ try:
+ from .anchor_finder import AnchorFinder
+ finder = AnchorFinder(self.rag)
+ anchors = await finder.find_anchors(control, skip_web=config.skip_web_search)
+ control.open_anchors = [asdict(a) if hasattr(a, '__dataclass_fields__') else a for a in anchors]
+ except Exception as e:
+ logger.warning("Anchor search failed: %s", e)
+
+ # Determine release state
+ if control.license_rule in (1, 2):
+ control.release_state = "draft"
+ elif control.open_anchors:
+ control.release_state = "draft"
+ else:
+ control.release_state = "needs_review"
+
+ # Generate control_id
+ domain = config.domain or _detect_domain(control.objective)
+ control.control_id = self._generate_control_id(domain, self.db)
+
+ # Store job_id in metadata
+ control.generation_metadata["job_id"] = job_id
+
+ return control
diff --git a/backend-compliance/migrations/046_control_generator.sql b/backend-compliance/migrations/046_control_generator.sql
new file mode 100644
index 0000000..08ea4e9
--- /dev/null
+++ b/backend-compliance/migrations/046_control_generator.sql
@@ -0,0 +1,103 @@
+-- Migration 046: Control Generator Pipeline
+-- Adds job tracking, chunk tracking, blocked sources, and extends canonical_controls
+-- for the 3-license-rule system (free_use, citation_required, restricted).
+
+BEGIN;
+
+-- =============================================================================
+-- 1. Job-Tracking for Generator Runs
+-- =============================================================================
+
+CREATE TABLE IF NOT EXISTS canonical_generation_jobs (
+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
+ status VARCHAR(20) DEFAULT 'pending'
+ CHECK (status IN ('pending', 'running', 'completed', 'failed', 'cancelled')),
+ config JSONB NOT NULL,
+ total_chunks_scanned INTEGER DEFAULT 0,
+ controls_generated INTEGER DEFAULT 0,
+ controls_verified INTEGER DEFAULT 0,
+ controls_needs_review INTEGER DEFAULT 0,
+ controls_too_close INTEGER DEFAULT 0,
+ controls_duplicates_found INTEGER DEFAULT 0,
+ errors JSONB DEFAULT '[]',
+ started_at TIMESTAMPTZ,
+ completed_at TIMESTAMPTZ,
+ created_at TIMESTAMPTZ DEFAULT NOW()
+);
+
+-- =============================================================================
+-- 2. Tracking which RAG chunks have been processed
+-- =============================================================================
+
+CREATE TABLE IF NOT EXISTS canonical_processed_chunks (
+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
+ chunk_hash VARCHAR(64) NOT NULL,
+ collection VARCHAR(100) NOT NULL,
+ regulation_code VARCHAR(100),
+ document_version VARCHAR(50),
+ source_license VARCHAR(50),
+ license_rule INTEGER NOT NULL
+ CHECK (license_rule IN (1, 2, 3)),
+ processing_path VARCHAR(20) NOT NULL
+ CHECK (processing_path IN ('structured', 'llm_reform', 'skipped')),
+ generated_control_ids JSONB DEFAULT '[]',
+ job_id UUID REFERENCES canonical_generation_jobs(id),
+ processed_at TIMESTAMPTZ DEFAULT NOW(),
+ UNIQUE (chunk_hash, collection, document_version)
+);
+
+CREATE INDEX IF NOT EXISTS idx_cpc_collection ON canonical_processed_chunks(collection);
+CREATE INDEX IF NOT EXISTS idx_cpc_regulation ON canonical_processed_chunks(regulation_code);
+CREATE INDEX IF NOT EXISTS idx_cpc_job ON canonical_processed_chunks(job_id);
+
+-- =============================================================================
+-- 3. Blocked Sources (Rule 3 documents to be deleted after generation)
+-- =============================================================================
+
+CREATE TABLE IF NOT EXISTS canonical_blocked_sources (
+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
+ regulation_code VARCHAR(100) NOT NULL UNIQUE,
+ document_title VARCHAR(500) NOT NULL,
+ reason VARCHAR(500) DEFAULT 'Kommerziell nicht nutzbar — darf nicht mit KI verarbeitet werden',
+ deletion_status VARCHAR(20) DEFAULT 'pending'
+ CHECK (deletion_status IN ('pending', 'marked_for_deletion', 'deleted')),
+ qdrant_collection VARCHAR(100),
+ original_file_path TEXT,
+ marked_at TIMESTAMPTZ DEFAULT NOW(),
+ deleted_at TIMESTAMPTZ
+);
+
+-- =============================================================================
+-- 4. Extend canonical_controls: release_state + 3-rule columns
+-- =============================================================================
+
+-- Expand release_state enum to include generator states
+ALTER TABLE canonical_controls DROP CONSTRAINT IF EXISTS canonical_controls_release_state_check;
+ALTER TABLE canonical_controls ADD CONSTRAINT canonical_controls_release_state_check
+ CHECK (release_state IN ('draft', 'review', 'approved', 'deprecated', 'needs_review', 'too_close', 'duplicate'));
+
+-- License rule: 1 = free_use, 2 = citation_required, 3 = restricted
+ALTER TABLE canonical_controls ADD COLUMN IF NOT EXISTS
+ license_rule INTEGER DEFAULT NULL;
+
+-- Original text from source (Rule 1+2 only; Rule 3 = always NULL)
+ALTER TABLE canonical_controls ADD COLUMN IF NOT EXISTS
+ source_original_text TEXT DEFAULT NULL;
+
+-- Citation info (Rule 1+2 only; Rule 3 = always NULL)
+ALTER TABLE canonical_controls ADD COLUMN IF NOT EXISTS
+ source_citation JSONB DEFAULT NULL;
+
+-- Whether source info may be shown to customers
+ALTER TABLE canonical_controls ADD COLUMN IF NOT EXISTS
+ customer_visible BOOLEAN DEFAULT true;
+
+-- Generation metadata (internal only, never shown to customers)
+ALTER TABLE canonical_controls ADD COLUMN IF NOT EXISTS
+ generation_metadata JSONB DEFAULT NULL;
+
+-- Index for filtering by license rule and customer visibility
+CREATE INDEX IF NOT EXISTS idx_canonical_controls_license_rule ON canonical_controls(license_rule);
+CREATE INDEX IF NOT EXISTS idx_canonical_controls_customer_visible ON canonical_controls(customer_visible);
+
+COMMIT;
diff --git a/backend-compliance/tests/test_control_generator.py b/backend-compliance/tests/test_control_generator.py
new file mode 100644
index 0000000..08406c3
--- /dev/null
+++ b/backend-compliance/tests/test_control_generator.py
@@ -0,0 +1,342 @@
+"""Tests for Control Generator Pipeline."""
+
+import json
+import pytest
+from unittest.mock import AsyncMock, MagicMock, patch
+
+from compliance.services.control_generator import (
+ _classify_regulation,
+ _detect_domain,
+ _parse_llm_json,
+ GeneratorConfig,
+ GeneratedControl,
+ ControlGeneratorPipeline,
+ REGULATION_LICENSE_MAP,
+)
+from compliance.services.anchor_finder import AnchorFinder, OpenAnchor
+from compliance.services.rag_client import RAGSearchResult
+
+
+# =============================================================================
+# License Mapping Tests
+# =============================================================================
+
+class TestLicenseMapping:
+ """Tests for regulation_code → license rule classification."""
+
+ def test_rule1_eu_law(self):
+ info = _classify_regulation("eu_2016_679")
+ assert info["rule"] == 1
+ assert info["name"] == "DSGVO"
+
+ def test_rule1_nist(self):
+ info = _classify_regulation("nist_sp_800_53")
+ assert info["rule"] == 1
+ assert "NIST" in info["name"]
+
+ def test_rule1_german_law(self):
+ info = _classify_regulation("bdsg")
+ assert info["rule"] == 1
+ assert info["name"] == "BDSG"
+
+ def test_rule2_owasp(self):
+ info = _classify_regulation("owasp_asvs")
+ assert info["rule"] == 2
+ assert "OWASP" in info["name"]
+ assert "attribution" in info
+
+ def test_rule2_enisa_prefix(self):
+ info = _classify_regulation("enisa_iot_security")
+ assert info["rule"] == 2
+ assert "ENISA" in info["name"]
+
+ def test_rule3_bsi_prefix(self):
+ info = _classify_regulation("bsi_tr03161")
+ assert info["rule"] == 3
+ assert info["name"] == "INTERNAL_ONLY"
+
+ def test_rule3_iso_prefix(self):
+ info = _classify_regulation("iso_27001")
+ assert info["rule"] == 3
+
+ def test_rule3_etsi_prefix(self):
+ info = _classify_regulation("etsi_en_303_645")
+ assert info["rule"] == 3
+
+ def test_unknown_defaults_to_rule3(self):
+ info = _classify_regulation("some_unknown_source")
+ assert info["rule"] == 3
+
+ def test_case_insensitive(self):
+ info = _classify_regulation("EU_2016_679")
+ assert info["rule"] == 1
+
+ def test_all_mapped_regulations_have_valid_rules(self):
+ for code, info in REGULATION_LICENSE_MAP.items():
+ assert info["rule"] in (1, 2, 3), f"{code} has invalid rule {info['rule']}"
+
+ def test_rule3_never_exposes_names(self):
+ for prefix in ["bsi_test", "iso_test", "etsi_test"]:
+ info = _classify_regulation(prefix)
+ assert info["name"] == "INTERNAL_ONLY", f"{prefix} exposes name: {info['name']}"
+
+
+# =============================================================================
+# Domain Detection Tests
+# =============================================================================
+
+class TestDomainDetection:
+
+ def test_auth_domain(self):
+ assert _detect_domain("Multi-factor authentication and password policy") == "AUTH"
+
+ def test_crypto_domain(self):
+ assert _detect_domain("TLS 1.3 encryption and certificate management") == "CRYPT"
+
+ def test_network_domain(self):
+ assert _detect_domain("Firewall rules and network segmentation") == "NET"
+
+ def test_data_domain(self):
+ assert _detect_domain("DSGVO personenbezogene Daten Datenschutz") == "DATA"
+
+ def test_default_domain(self):
+ assert _detect_domain("random unrelated text xyz") == "SEC"
+
+
+# =============================================================================
+# JSON Parsing Tests
+# =============================================================================
+
+class TestJsonParsing:
+
+ def test_parse_plain_json(self):
+ result = _parse_llm_json('{"title": "Test", "objective": "Test obj"}')
+ assert result["title"] == "Test"
+
+ def test_parse_markdown_fenced_json(self):
+ raw = '```json\n{"title": "Test"}\n```'
+ result = _parse_llm_json(raw)
+ assert result["title"] == "Test"
+
+ def test_parse_json_with_preamble(self):
+ raw = 'Here is the result:\n{"title": "Test"}'
+ result = _parse_llm_json(raw)
+ assert result["title"] == "Test"
+
+ def test_parse_invalid_json(self):
+ result = _parse_llm_json("not json at all")
+ assert result == {}
+
+
+# =============================================================================
+# GeneratedControl Rule Tests
+# =============================================================================
+
+class TestGeneratedControlRules:
+ """Tests that enforce the 3-rule licensing constraints."""
+
+ def test_rule1_has_original_text(self):
+ ctrl = GeneratedControl(license_rule=1)
+ ctrl.source_original_text = "Original EU law text"
+ ctrl.source_citation = {"source": "DSGVO Art. 35", "license": "EU_LAW"}
+ ctrl.customer_visible = True
+
+ assert ctrl.source_original_text is not None
+ assert ctrl.source_citation is not None
+ assert ctrl.customer_visible is True
+
+ def test_rule2_has_citation(self):
+ ctrl = GeneratedControl(license_rule=2)
+ ctrl.source_citation = {"source": "OWASP ASVS V2.1", "license": "CC-BY-SA-4.0"}
+ ctrl.customer_visible = True
+
+ assert ctrl.source_citation is not None
+ assert "CC-BY-SA" in ctrl.source_citation["license"]
+
+ def test_rule3_no_original_no_citation(self):
+ ctrl = GeneratedControl(license_rule=3)
+ ctrl.source_original_text = None
+ ctrl.source_citation = None
+ ctrl.customer_visible = False
+ ctrl.generation_metadata = {"processing_path": "llm_reform", "license_rule": 3}
+
+ assert ctrl.source_original_text is None
+ assert ctrl.source_citation is None
+ assert ctrl.customer_visible is False
+ # generation_metadata must NOT contain source names
+ metadata_str = json.dumps(ctrl.generation_metadata)
+ assert "bsi" not in metadata_str.lower()
+ assert "iso" not in metadata_str.lower()
+ assert "TR-03161" not in metadata_str
+
+
+# =============================================================================
+# Anchor Finder Tests
+# =============================================================================
+
+class TestAnchorFinder:
+
+ @pytest.mark.asyncio
+ async def test_rag_anchor_search_filters_restricted(self):
+ """Only Rule 1+2 sources are returned as anchors."""
+ mock_rag = AsyncMock()
+ mock_rag.search.return_value = [
+ RAGSearchResult(
+ text="OWASP requirement",
+ regulation_code="owasp_asvs",
+ regulation_name="OWASP ASVS",
+ regulation_short="OWASP",
+ category="requirement",
+ article="V2.1.1",
+ paragraph="",
+ source_url="https://owasp.org",
+ score=0.9,
+ ),
+ RAGSearchResult(
+ text="BSI requirement",
+ regulation_code="bsi_tr03161",
+ regulation_name="BSI TR-03161",
+ regulation_short="BSI",
+ category="requirement",
+ article="O.Auth_1",
+ paragraph="",
+ source_url="",
+ score=0.85,
+ ),
+ ]
+
+ finder = AnchorFinder(rag_client=mock_rag)
+ control = GeneratedControl(title="Test Auth Control", tags=["auth"])
+
+ anchors = await finder.find_anchors(control, skip_web=True)
+
+ # Only OWASP should be returned (Rule 2), BSI should be filtered out (Rule 3)
+ assert len(anchors) == 1
+ assert anchors[0].framework == "OWASP ASVS"
+
+ @pytest.mark.asyncio
+ async def test_web_search_identifies_frameworks(self):
+ finder = AnchorFinder()
+
+ assert finder._identify_framework_from_url("https://owasp.org/asvs") == "OWASP"
+ assert finder._identify_framework_from_url("https://csrc.nist.gov/sp800-53") == "NIST"
+ assert finder._identify_framework_from_url("https://www.enisa.europa.eu/pub") == "ENISA"
+ assert finder._identify_framework_from_url("https://random-site.com") is None
+
+
+# =============================================================================
+# Pipeline Integration Tests (Mocked)
+# =============================================================================
+
+class TestPipelineMocked:
+ """Tests for the pipeline with mocked DB and external services."""
+
+ def _make_chunk(self, regulation_code: str = "owasp_asvs", article: str = "V2.1.1"):
+ return RAGSearchResult(
+ text="Applications must implement multi-factor authentication.",
+ regulation_code=regulation_code,
+ regulation_name="OWASP ASVS",
+ regulation_short="OWASP",
+ category="requirement",
+ article=article,
+ paragraph="",
+ source_url="https://owasp.org",
+ score=0.9,
+ )
+
+ @pytest.mark.asyncio
+ async def test_rule1_processing_path(self):
+ """Rule 1 chunks produce controls with original text."""
+ chunk = self._make_chunk(regulation_code="eu_2016_679", article="Art. 35")
+ chunk.text = "Die Datenschutz-Folgenabschaetzung ist durchzufuehren."
+ chunk.regulation_name = "DSGVO"
+
+ mock_db = MagicMock()
+ mock_db.execute.return_value.fetchone.return_value = None
+
+ pipeline = ControlGeneratorPipeline(db=mock_db)
+ license_info = pipeline._classify_license(chunk)
+
+ assert license_info["rule"] == 1
+
+ @pytest.mark.asyncio
+ async def test_rule3_processing_blocks_source_info(self):
+ """Rule 3 must never store original text or source names."""
+ mock_db = MagicMock()
+ mock_rag = AsyncMock()
+
+ pipeline = ControlGeneratorPipeline(db=mock_db, rag_client=mock_rag)
+
+ # Simulate LLM response
+ llm_response = json.dumps({
+ "title": "Secure Password Storage",
+ "objective": "Passwords must be hashed with modern algorithms.",
+ "rationale": "Prevents credential theft.",
+ "requirements": ["Use bcrypt or argon2"],
+ "test_procedure": ["Verify hash algorithm"],
+ "evidence": ["Config review"],
+ "severity": "high",
+ "tags": ["auth", "password"],
+ })
+
+ with patch("compliance.services.control_generator._llm_chat", return_value=llm_response):
+ chunk = self._make_chunk(regulation_code="bsi_tr03161", article="O.Auth_1")
+ config = GeneratorConfig(max_controls=1)
+ control = await pipeline._llm_reformulate(chunk, config)
+
+ assert control.license_rule == 3
+ assert control.source_original_text is None
+ assert control.source_citation is None
+ assert control.customer_visible is False
+ # Verify no BSI references in metadata
+ metadata_str = json.dumps(control.generation_metadata)
+ assert "bsi" not in metadata_str.lower()
+ assert "BSI" not in metadata_str
+ assert "TR-03161" not in metadata_str
+
+ @pytest.mark.asyncio
+ async def test_chunk_hash_deduplication(self):
+ """Same chunk text produces same hash — no double processing."""
+ import hashlib
+ text = "Test requirement text"
+ h1 = hashlib.sha256(text.encode()).hexdigest()
+ h2 = hashlib.sha256(text.encode()).hexdigest()
+ assert h1 == h2
+
+ def test_config_defaults(self):
+ config = GeneratorConfig()
+ assert config.max_controls == 50
+ assert config.batch_size == 5
+ assert config.skip_processed is True
+ assert config.dry_run is False
+
+ @pytest.mark.asyncio
+ async def test_structure_free_use_produces_citation(self):
+ """Rule 1 structuring includes source citation."""
+ mock_db = MagicMock()
+ pipeline = ControlGeneratorPipeline(db=mock_db)
+
+ llm_response = json.dumps({
+ "title": "DSFA Pflicht",
+ "objective": "DSFA bei hohem Risiko durchfuehren.",
+ "rationale": "Gesetzliche Pflicht nach DSGVO.",
+ "requirements": ["DSFA durchfuehren"],
+ "test_procedure": ["DSFA Bericht pruefen"],
+ "evidence": ["DSFA Dokumentation"],
+ "severity": "high",
+ "tags": ["dsfa", "dsgvo"],
+ })
+
+ chunk = self._make_chunk(regulation_code="eu_2016_679", article="Art. 35")
+ chunk.text = "Art. 35 DSGVO: Datenschutz-Folgenabschaetzung"
+ chunk.regulation_name = "DSGVO"
+ license_info = _classify_regulation("eu_2016_679")
+
+ with patch("compliance.services.control_generator._llm_chat", return_value=llm_response):
+ control = await pipeline._structure_free_use(chunk, license_info)
+
+ assert control.license_rule == 1
+ assert control.source_original_text is not None
+ assert control.source_citation is not None
+ assert "DSGVO" in control.source_citation["source"]
+ assert control.customer_visible is True