fix: Pipeline-Skalierung — 6 Optimierungen für 80k+ Controls
1. control_generator: GeneratorResult.status Default "completed" → "running" (Bug) 2. control_generator: Anthropic API mit Phase-Timeouts + Retry bei Disconnect 3. control_generator: regulation_exclude Filter + Harmonization via Qdrant statt In-Memory 4. decomposition_pass: Enrich Pass Batch-UPDATEs (400k → ~400 DB-Calls) 5. decomposition_pass: Merge Pass single Query statt N+1 6. batch_dedup_runner: Cross-Group Dedup parallelisiert (asyncio.gather) 7. canonical_control_routes: Framework Controls API Pagination (limit/offset) 8. DB-Indizes: idx_oc_parent_release, idx_oc_trigger_null, idx_cc_framework Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -349,8 +349,15 @@ Antworte NUR mit einem JSON-Array. Keine Erklärungen."""
|
||||
|
||||
def _build_pass0a_prompt(
|
||||
title: str, objective: str, requirements: str,
|
||||
test_procedure: str, source_ref: str
|
||||
test_procedure: str, source_ref: str,
|
||||
source_original_text: str = ""
|
||||
) -> str:
|
||||
original_block = ""
|
||||
if source_original_text:
|
||||
original_block = f"""
|
||||
ORIGINALTEXT (Gesetz/Verordnung — nutze fuer praezisere Pflichtableitung):
|
||||
{source_original_text[:3000]}
|
||||
"""
|
||||
return f"""\
|
||||
Analysiere das folgende Control und extrahiere alle einzelnen normativen \
|
||||
Pflichten als JSON-Array.
|
||||
@@ -361,7 +368,7 @@ Ziel: {objective}
|
||||
Anforderungen: {requirements}
|
||||
Prüfverfahren: {test_procedure}
|
||||
Quellreferenz: {source_ref}
|
||||
|
||||
{original_block}
|
||||
Antworte als JSON-Array:
|
||||
[
|
||||
{{
|
||||
@@ -2407,7 +2414,8 @@ class DecompositionPass:
|
||||
query = """
|
||||
SELECT cc.id, cc.control_id, cc.title, cc.objective,
|
||||
cc.requirements, cc.test_procedure,
|
||||
cc.source_citation, cc.category
|
||||
cc.source_citation, cc.category,
|
||||
cc.source_original_text
|
||||
FROM canonical_controls cc
|
||||
WHERE cc.release_state NOT IN ('deprecated')
|
||||
AND cc.parent_control_uuid IS NULL
|
||||
@@ -2473,6 +2481,7 @@ class DecompositionPass:
|
||||
"test_procedure": test_str,
|
||||
"source_ref": source_str,
|
||||
"category": row[7] or "",
|
||||
"source_original_text": row[8] or "",
|
||||
})
|
||||
|
||||
# Process in batches
|
||||
@@ -2507,6 +2516,7 @@ class DecompositionPass:
|
||||
requirements=ctrl["requirements"],
|
||||
test_procedure=ctrl["test_procedure"],
|
||||
source_ref=ctrl["source_ref"],
|
||||
source_original_text=ctrl.get("source_original_text", ""),
|
||||
)
|
||||
llm_response = await _llm_anthropic(
|
||||
prompt=prompt,
|
||||
@@ -2529,6 +2539,7 @@ class DecompositionPass:
|
||||
requirements=ctrl["requirements"],
|
||||
test_procedure=ctrl["test_procedure"],
|
||||
source_ref=ctrl["source_ref"],
|
||||
source_original_text=ctrl.get("source_original_text", ""),
|
||||
)
|
||||
llm_response = await _llm_ollama(
|
||||
prompt=prompt,
|
||||
@@ -3008,29 +3019,36 @@ class DecompositionPass:
|
||||
"obligations_kept": 0,
|
||||
}
|
||||
|
||||
# Get all parents that have >1 validated obligation
|
||||
parents = self.db.execute(text("""
|
||||
SELECT parent_control_uuid, count(*) AS cnt
|
||||
FROM obligation_candidates
|
||||
WHERE release_state = 'validated'
|
||||
AND merged_into_id IS NULL
|
||||
GROUP BY parent_control_uuid
|
||||
HAVING count(*) > 1
|
||||
# Load ALL obligations in one query (avoids N+1 per parent)
|
||||
all_obligs = self.db.execute(text("""
|
||||
SELECT oc.id, oc.candidate_id, oc.obligation_text, oc.action, oc.object,
|
||||
oc.parent_control_uuid
|
||||
FROM obligation_candidates oc
|
||||
WHERE oc.release_state = 'validated'
|
||||
AND oc.merged_into_id IS NULL
|
||||
AND oc.parent_control_uuid IN (
|
||||
SELECT parent_control_uuid
|
||||
FROM obligation_candidates
|
||||
WHERE release_state = 'validated'
|
||||
AND merged_into_id IS NULL
|
||||
GROUP BY parent_control_uuid
|
||||
HAVING count(*) > 1
|
||||
)
|
||||
ORDER BY oc.parent_control_uuid, oc.created_at
|
||||
""")).fetchall()
|
||||
|
||||
for parent_uuid, cnt in parents:
|
||||
stats["parents_checked"] += 1
|
||||
obligs = self.db.execute(text("""
|
||||
SELECT id, candidate_id, obligation_text, action, object
|
||||
FROM obligation_candidates
|
||||
WHERE parent_control_uuid = CAST(:pid AS uuid)
|
||||
AND release_state = 'validated'
|
||||
AND merged_into_id IS NULL
|
||||
ORDER BY created_at
|
||||
"""), {"pid": str(parent_uuid)}).fetchall()
|
||||
# Group by parent in Python
|
||||
from collections import defaultdict
|
||||
parent_groups: dict[str, list] = defaultdict(list)
|
||||
for row in all_obligs:
|
||||
parent_groups[str(row[5])].append(row)
|
||||
|
||||
merged_ids = set()
|
||||
oblig_list = list(obligs)
|
||||
merge_batch: list[dict] = []
|
||||
MERGE_FLUSH_SIZE = 200
|
||||
|
||||
for parent_uuid, oblig_list in parent_groups.items():
|
||||
stats["parents_checked"] += 1
|
||||
merged_ids: set[str] = set()
|
||||
|
||||
for i in range(len(oblig_list)):
|
||||
if str(oblig_list[i][0]) in merged_ids:
|
||||
@@ -3044,13 +3062,11 @@ class DecompositionPass:
|
||||
obj_i = (oblig_list[i][4] or "").lower().strip()
|
||||
obj_j = (oblig_list[j][4] or "").lower().strip()
|
||||
|
||||
# Check if actions are similar enough to be duplicates
|
||||
if not _text_similar(action_i, action_j, threshold=0.75):
|
||||
continue
|
||||
if not _text_similar(obj_i, obj_j, threshold=0.60):
|
||||
continue
|
||||
|
||||
# Keep the more abstract one (shorter text = less specific)
|
||||
text_i = oblig_list[i][2] or ""
|
||||
text_j = oblig_list[j][2] or ""
|
||||
if _is_more_implementation_specific(text_j, text_i):
|
||||
@@ -3060,18 +3076,31 @@ class DecompositionPass:
|
||||
survivor_id = str(oblig_list[j][0])
|
||||
merged_id = str(oblig_list[i][0])
|
||||
|
||||
merge_batch.append({"survivor": survivor_id, "merged": merged_id})
|
||||
merged_ids.add(merged_id)
|
||||
stats["obligations_merged"] += 1
|
||||
|
||||
# Flush batch periodically
|
||||
if len(merge_batch) >= MERGE_FLUSH_SIZE:
|
||||
for m in merge_batch:
|
||||
self.db.execute(text("""
|
||||
UPDATE obligation_candidates
|
||||
SET release_state = 'merged',
|
||||
merged_into_id = CAST(:survivor AS uuid)
|
||||
WHERE id = CAST(:merged AS uuid)
|
||||
"""), {"survivor": survivor_id, "merged": merged_id})
|
||||
"""), m)
|
||||
self.db.commit()
|
||||
merge_batch.clear()
|
||||
|
||||
merged_ids.add(merged_id)
|
||||
stats["obligations_merged"] += 1
|
||||
|
||||
# Commit per parent to avoid large transactions
|
||||
self.db.commit()
|
||||
# Flush remainder
|
||||
for m in merge_batch:
|
||||
self.db.execute(text("""
|
||||
UPDATE obligation_candidates
|
||||
SET release_state = 'merged',
|
||||
merged_into_id = CAST(:survivor AS uuid)
|
||||
WHERE id = CAST(:merged AS uuid)
|
||||
"""), m)
|
||||
self.db.commit()
|
||||
|
||||
stats["obligations_kept"] = self.db.execute(text("""
|
||||
SELECT count(*) FROM obligation_candidates
|
||||
@@ -3106,6 +3135,10 @@ class DecompositionPass:
|
||||
AND trigger_type IS NULL
|
||||
""")).fetchall()
|
||||
|
||||
# Classify all obligations first, then batch-update
|
||||
BATCH_SIZE = 500
|
||||
pending_updates: list[dict] = []
|
||||
|
||||
for row in obligs:
|
||||
oc_id = str(row[0])
|
||||
obl_text = row[1] or ""
|
||||
@@ -3116,22 +3149,42 @@ class DecompositionPass:
|
||||
trigger = _classify_trigger_type(obl_text, condition)
|
||||
impl = _is_implementation_specific_text(obl_text, action, obj)
|
||||
|
||||
self.db.execute(text("""
|
||||
UPDATE obligation_candidates
|
||||
SET trigger_type = :trigger,
|
||||
is_implementation_specific = :impl
|
||||
WHERE id = CAST(:oid AS uuid)
|
||||
"""), {"trigger": trigger, "impl": impl, "oid": oc_id})
|
||||
|
||||
pending_updates.append({"oid": oc_id, "trigger": trigger, "impl": impl})
|
||||
stats["enriched"] += 1
|
||||
stats[f"trigger_{trigger}"] += 1
|
||||
stats[f"trigger_{trigger}"] = stats.get(f"trigger_{trigger}", 0) + 1
|
||||
if impl:
|
||||
stats["implementation_specific"] += 1
|
||||
|
||||
# Flush batch
|
||||
if len(pending_updates) >= BATCH_SIZE:
|
||||
self._flush_enrich_batch(pending_updates)
|
||||
pending_updates.clear()
|
||||
|
||||
# Flush remainder
|
||||
if pending_updates:
|
||||
self._flush_enrich_batch(pending_updates)
|
||||
|
||||
self.db.commit()
|
||||
logger.info("Enrich pass: %s", stats)
|
||||
return stats
|
||||
|
||||
def _flush_enrich_batch(self, updates: list[dict]):
|
||||
"""Batch-UPDATE obligation_candidates for enrich pass."""
|
||||
# Group by (trigger, impl) to minimize UPDATE statements
|
||||
from collections import defaultdict
|
||||
groups: dict[tuple, list[str]] = defaultdict(list)
|
||||
for u in updates:
|
||||
groups[(u["trigger"], u["impl"])].append(u["oid"])
|
||||
|
||||
for (trigger, impl), ids in groups.items():
|
||||
# Use ANY(ARRAY[...]) for batch WHERE clause
|
||||
self.db.execute(text("""
|
||||
UPDATE obligation_candidates
|
||||
SET trigger_type = :trigger,
|
||||
is_implementation_specific = :impl
|
||||
WHERE id = ANY(CAST(:ids AS uuid[]))
|
||||
"""), {"trigger": trigger, "impl": impl, "ids": ids})
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# Decomposition Status
|
||||
# -------------------------------------------------------------------
|
||||
@@ -3365,7 +3418,8 @@ class DecompositionPass:
|
||||
query = """
|
||||
SELECT cc.id, cc.control_id, cc.title, cc.objective,
|
||||
cc.requirements, cc.test_procedure,
|
||||
cc.source_citation, cc.category
|
||||
cc.source_citation, cc.category,
|
||||
cc.source_original_text
|
||||
FROM canonical_controls cc
|
||||
WHERE cc.release_state NOT IN ('deprecated')
|
||||
AND cc.parent_control_uuid IS NULL
|
||||
@@ -3414,6 +3468,7 @@ class DecompositionPass:
|
||||
"test_procedure": _format_field(row[5] or ""),
|
||||
"source_ref": _format_citation(row[6] or ""),
|
||||
"category": row[7] or "",
|
||||
"source_original_text": row[8] or "",
|
||||
})
|
||||
|
||||
if not prepared:
|
||||
|
||||
Reference in New Issue
Block a user