feat: LLM-basierter Rationale-Backfill fuer atomare Controls

POST /controls/backfill-rationale — ersetzt Placeholder "Aus Obligation
abgeleitet." durch LLM-generierte Begruendungen (Ollama/qwen3.5).
Optimierung: gruppiert ~86k Controls nach ~7k Parents, ein LLM-Call pro Parent.
Paginierung via batch_size/offset fuer kontrollierte Ausfuehrung.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-25 23:01:49 +01:00
parent 81ce9dde07
commit 23dd5116b3
2 changed files with 423 additions and 0 deletions

View File

@@ -1112,6 +1112,170 @@ async def backfill_evidence_type(
}
# =============================================================================
# RATIONALE BACKFILL (LLM)
# =============================================================================
@router.post("/controls/backfill-rationale")
async def backfill_rationale(
dry_run: bool = Query(True, description="Nur zaehlen, nicht aendern"),
batch_size: int = Query(50, description="Parent-Controls pro Durchlauf"),
offset: int = Query(0, description="Offset fuer Paginierung (Parent-Index)"),
):
"""
Generiert sinnvolle Begruendungen fuer atomare Controls per LLM.
Optimierung: Gruppiert nach Parent-Control (~7k Parents statt ~86k Einzel-Calls).
Pro Parent-Gruppe wird EIN LLM-Aufruf gemacht, der eine gemeinsame
Begruendung fuer alle Kinder erzeugt.
Workflow:
1. dry_run=true → Statistiken anzeigen
2. dry_run=false&batch_size=50&offset=0 → Erste 50 Parents verarbeiten
3. Wiederholen mit offset=50, 100, ... bis fertig
"""
from compliance.services.llm_provider import get_llm_provider
with SessionLocal() as db:
# 1. Parent-Controls mit Kindern laden (nur wo rationale = Placeholder)
parents = db.execute(text("""
SELECT p.id AS parent_uuid, p.control_id, p.title, p.category,
p.source_citation->>'source' AS source_name,
COUNT(c.id) AS child_count
FROM canonical_controls p
JOIN canonical_controls c ON c.parent_control_uuid = p.id
WHERE c.rationale = 'Aus Obligation abgeleitet.'
AND c.release_state NOT IN ('rejected', 'merged')
GROUP BY p.id, p.control_id, p.title, p.category,
p.source_citation->>'source'
ORDER BY p.control_id
""")).fetchall()
total_parents = len(parents)
total_children = sum(p.child_count for p in parents)
if dry_run:
return {
"dry_run": True,
"total_parents": total_parents,
"total_children": total_children,
"estimated_llm_calls": total_parents,
"sample_parents": [
{
"control_id": p.control_id,
"title": p.title,
"source": p.source_name,
"child_count": p.child_count,
}
for p in parents[:10]
],
}
# 2. Batch auswählen
batch = parents[offset : offset + batch_size]
if not batch:
return {
"dry_run": False,
"message": "Kein weiterer Batch — alle Parents verarbeitet.",
"total_parents": total_parents,
"offset": offset,
"processed": 0,
}
provider = get_llm_provider()
processed = 0
children_updated = 0
errors = []
sample_rationales = []
for parent in batch:
parent_uuid = str(parent.parent_uuid)
source = parent.source_name or "Regulierung"
# LLM-Prompt
prompt = (
f"Du bist Compliance-Experte. Erklaere in 1-2 Saetzen auf Deutsch, "
f"WARUM aus dem uebergeordneten Control atomare Teilmassnahmen "
f"abgeleitet wurden.\n\n"
f"Uebergeordnetes Control: {parent.control_id}{parent.title}\n"
f"Regulierung: {source}\n"
f"Kategorie: {parent.category or 'k.A.'}\n"
f"Anzahl atomarer Controls: {parent.child_count}\n\n"
f"Schreibe NUR die Begruendung (1-2 Saetze). Kein Markdown, "
f"keine Aufzaehlung, kein Praefix. "
f"Erklaere den regulatorischen Hintergrund und warum die "
f"Zerlegung in atomare, testbare Massnahmen notwendig ist."
)
try:
response = await provider.complete(
prompt=prompt,
max_tokens=256,
temperature=0.3,
)
rationale = response.content.strip()
# Bereinigen: Anfuehrungszeichen, Markdown entfernen
rationale = rationale.strip('"').strip("'").strip()
if rationale.startswith("Begründung:") or rationale.startswith("Begruendung:"):
rationale = rationale.split(":", 1)[1].strip()
# Laenge begrenzen (max 500 Zeichen)
if len(rationale) > 500:
rationale = rationale[:497] + "..."
if not rationale or len(rationale) < 10:
errors.append({
"control_id": parent.control_id,
"error": "LLM-Antwort zu kurz oder leer",
})
continue
# Alle Kinder dieses Parents updaten
result = db.execute(
text("""
UPDATE canonical_controls
SET rationale = :rationale
WHERE parent_control_uuid = CAST(:pid AS uuid)
AND rationale = 'Aus Obligation abgeleitet.'
AND release_state NOT IN ('rejected', 'merged')
"""),
{"rationale": rationale, "pid": parent_uuid},
)
children_updated += result.rowcount
processed += 1
if len(sample_rationales) < 5:
sample_rationales.append({
"parent": parent.control_id,
"title": parent.title,
"rationale": rationale,
"children_updated": result.rowcount,
})
except Exception as e:
logger.error(f"LLM error for {parent.control_id}: {e}")
errors.append({
"control_id": parent.control_id,
"error": str(e)[:200],
})
db.commit()
return {
"dry_run": False,
"offset": offset,
"batch_size": batch_size,
"next_offset": offset + batch_size if offset + batch_size < total_parents else None,
"processed_parents": processed,
"children_updated": children_updated,
"total_parents": total_parents,
"total_children": total_children,
"errors": errors[:10],
"sample_rationales": sample_rationales,
}
# =============================================================================
# CONTROL CRUD (CREATE / UPDATE / DELETE)
# =============================================================================

View File

@@ -0,0 +1,259 @@
"""Tests for the rationale backfill endpoint logic."""
import sys
sys.path.insert(0, ".")
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from compliance.api.canonical_control_routes import backfill_rationale
class TestRationaleBackfillDryRun:
"""Dry-run mode should return statistics without touching DB."""
@pytest.mark.asyncio
async def test_dry_run_returns_stats(self):
mock_parents = [
MagicMock(
parent_uuid="uuid-1",
control_id="ACC-001",
title="Access Control",
category="access",
source_name="OWASP ASVS",
child_count=12,
),
MagicMock(
parent_uuid="uuid-2",
control_id="SEC-042",
title="Encryption",
category="encryption",
source_name="NIST SP 800-53",
child_count=5,
),
]
with patch("compliance.api.canonical_control_routes.SessionLocal") as mock_session:
db = MagicMock()
mock_session.return_value.__enter__ = MagicMock(return_value=db)
mock_session.return_value.__exit__ = MagicMock(return_value=False)
db.execute.return_value.fetchall.return_value = mock_parents
result = await backfill_rationale(dry_run=True, batch_size=50, offset=0)
assert result["dry_run"] is True
assert result["total_parents"] == 2
assert result["total_children"] == 17
assert result["estimated_llm_calls"] == 2
assert len(result["sample_parents"]) == 2
assert result["sample_parents"][0]["control_id"] == "ACC-001"
class TestRationaleBackfillExecution:
"""Execution mode should call LLM and update DB."""
@pytest.mark.asyncio
async def test_processes_batch_and_updates(self):
mock_parents = [
MagicMock(
parent_uuid="uuid-1",
control_id="ACC-001",
title="Access Control",
category="access",
source_name="OWASP ASVS",
child_count=5,
),
]
mock_llm_response = MagicMock()
mock_llm_response.content = (
"Die uebergeordneten Anforderungen an Zugriffskontrolle aus "
"OWASP ASVS erfordern eine Zerlegung in atomare Massnahmen, "
"um jede Einzelmassnahme unabhaengig testbar zu machen."
)
mock_update_result = MagicMock()
mock_update_result.rowcount = 5
with patch("compliance.api.canonical_control_routes.SessionLocal") as mock_session:
db = MagicMock()
mock_session.return_value.__enter__ = MagicMock(return_value=db)
mock_session.return_value.__exit__ = MagicMock(return_value=False)
db.execute.return_value.fetchall.return_value = mock_parents
# Second call is the UPDATE
db.execute.return_value.rowcount = 5
with patch("compliance.services.llm_provider.get_llm_provider") as mock_get:
mock_provider = AsyncMock()
mock_provider.complete.return_value = mock_llm_response
mock_get.return_value = mock_provider
result = await backfill_rationale(
dry_run=False, batch_size=50, offset=0,
)
assert result["dry_run"] is False
assert result["processed_parents"] == 1
assert len(result["errors"]) == 0
assert len(result["sample_rationales"]) == 1
@pytest.mark.asyncio
async def test_empty_batch_returns_done(self):
with patch("compliance.api.canonical_control_routes.SessionLocal") as mock_session:
db = MagicMock()
mock_session.return_value.__enter__ = MagicMock(return_value=db)
mock_session.return_value.__exit__ = MagicMock(return_value=False)
db.execute.return_value.fetchall.return_value = []
result = await backfill_rationale(
dry_run=False, batch_size=50, offset=9999,
)
assert result["processed"] == 0
assert "Kein weiterer Batch" in result["message"]
@pytest.mark.asyncio
async def test_llm_error_captured(self):
mock_parents = [
MagicMock(
parent_uuid="uuid-1",
control_id="SEC-100",
title="Network Security",
category="network",
source_name="ISO 27001",
child_count=3,
),
]
with patch("compliance.api.canonical_control_routes.SessionLocal") as mock_session:
db = MagicMock()
mock_session.return_value.__enter__ = MagicMock(return_value=db)
mock_session.return_value.__exit__ = MagicMock(return_value=False)
db.execute.return_value.fetchall.return_value = mock_parents
with patch("compliance.services.llm_provider.get_llm_provider") as mock_get:
mock_provider = AsyncMock()
mock_provider.complete.side_effect = Exception("Ollama timeout")
mock_get.return_value = mock_provider
result = await backfill_rationale(
dry_run=False, batch_size=50, offset=0,
)
assert result["processed_parents"] == 0
assert len(result["errors"]) == 1
assert "Ollama timeout" in result["errors"][0]["error"]
@pytest.mark.asyncio
async def test_short_response_skipped(self):
mock_parents = [
MagicMock(
parent_uuid="uuid-1",
control_id="GOV-001",
title="Governance",
category="governance",
source_name="ISO 27001",
child_count=2,
),
]
mock_llm_response = MagicMock()
mock_llm_response.content = "OK" # Too short
with patch("compliance.api.canonical_control_routes.SessionLocal") as mock_session:
db = MagicMock()
mock_session.return_value.__enter__ = MagicMock(return_value=db)
mock_session.return_value.__exit__ = MagicMock(return_value=False)
db.execute.return_value.fetchall.return_value = mock_parents
with patch("compliance.services.llm_provider.get_llm_provider") as mock_get:
mock_provider = AsyncMock()
mock_provider.complete.return_value = mock_llm_response
mock_get.return_value = mock_provider
result = await backfill_rationale(
dry_run=False, batch_size=50, offset=0,
)
assert result["processed_parents"] == 0
assert len(result["errors"]) == 1
assert "zu kurz" in result["errors"][0]["error"]
class TestRationalePagination:
"""Pagination logic should work correctly."""
@pytest.mark.asyncio
async def test_next_offset_set_when_more_remain(self):
# 3 parents, batch_size=2 → next_offset=2
mock_parents = [
MagicMock(
parent_uuid=f"uuid-{i}",
control_id=f"SEC-{i:03d}",
title=f"Control {i}",
category="security",
source_name="NIST",
child_count=2,
)
for i in range(3)
]
mock_llm_response = MagicMock()
mock_llm_response.content = (
"Sicherheitsanforderungen aus NIST erfordern atomare "
"Massnahmen fuer unabhaengige Testbarkeit."
)
with patch("compliance.api.canonical_control_routes.SessionLocal") as mock_session:
db = MagicMock()
mock_session.return_value.__enter__ = MagicMock(return_value=db)
mock_session.return_value.__exit__ = MagicMock(return_value=False)
db.execute.return_value.fetchall.return_value = mock_parents
db.execute.return_value.rowcount = 2
with patch("compliance.services.llm_provider.get_llm_provider") as mock_get:
mock_provider = AsyncMock()
mock_provider.complete.return_value = mock_llm_response
mock_get.return_value = mock_provider
result = await backfill_rationale(
dry_run=False, batch_size=2, offset=0,
)
assert result["next_offset"] == 2
assert result["processed_parents"] == 2
@pytest.mark.asyncio
async def test_next_offset_none_when_done(self):
mock_parents = [
MagicMock(
parent_uuid="uuid-1",
control_id="SEC-001",
title="Control 1",
category="security",
source_name="NIST",
child_count=2,
),
]
mock_llm_response = MagicMock()
mock_llm_response.content = (
"Sicherheitsanforderungen erfordern atomare Massnahmen."
)
with patch("compliance.api.canonical_control_routes.SessionLocal") as mock_session:
db = MagicMock()
mock_session.return_value.__enter__ = MagicMock(return_value=db)
mock_session.return_value.__exit__ = MagicMock(return_value=False)
db.execute.return_value.fetchall.return_value = mock_parents
db.execute.return_value.rowcount = 2
with patch("compliance.services.llm_provider.get_llm_provider") as mock_get:
mock_provider = AsyncMock()
mock_provider.complete.return_value = mock_llm_response
mock_get.return_value = mock_provider
result = await backfill_rationale(
dry_run=False, batch_size=50, offset=0,
)
assert result["next_offset"] is None