feat(training+controls): interactive video pipeline, training blocks, control generator, CE libraries
Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Failing after 37s
CI/CD / test-python-backend-compliance (push) Successful in 39s
CI/CD / test-python-document-crawler (push) Successful in 26s
CI/CD / test-python-dsms-gateway (push) Successful in 23s
CI/CD / validate-canonical-controls (push) Successful in 12s
CI/CD / Deploy (push) Has been skipped

Interactive Training Videos (CP-TRAIN):
- DB migration 022: training_checkpoints + checkpoint_progress tables
- NarratorScript generation via Anthropic (AI Teacher persona, German)
- TTS batch synthesis + interactive video pipeline (slides + checkpoint slides + FFmpeg)
- 4 new API endpoints: generate-interactive, interactive-manifest, checkpoint submit, checkpoint progress
- InteractiveVideoPlayer component (HTML5 Video, quiz overlay, seek protection, progress tracking)
- Learner portal integration with automatic completion on all checkpoints passed
- 30 new tests (handler validation + grading logic + manifest/progress + seek protection)

Training Blocks:
- Block generator, block store, block config CRUD + preview/generate endpoints
- Migration 021: training_blocks schema

Control Generator + Canonical Library:
- Control generator routes + service enhancements
- Canonical control library helpers, sidebar entry
- Citation backfill service + tests
- CE libraries data (hazard, protection, evidence, lifecycle, components)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-16 21:41:48 +01:00
parent d2133dbfa2
commit 4f6bc8f6f6
50 changed files with 17299 additions and 198 deletions

View File

@@ -44,6 +44,7 @@ logger = logging.getLogger(__name__)
SDK_URL = os.getenv("SDK_URL", "http://ai-compliance-sdk:8090")
EMBEDDING_URL = os.getenv("EMBEDDING_URL", "http://embedding-service:8087")
QDRANT_URL = os.getenv("QDRANT_URL", "http://host.docker.internal:6333")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
ANTHROPIC_MODEL = os.getenv("CONTROL_GEN_ANTHROPIC_MODEL", "claude-sonnet-4-6")
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
@@ -54,7 +55,6 @@ HARMONIZATION_THRESHOLD = 0.85 # Cosine similarity above this = duplicate
ALL_COLLECTIONS = [
"bp_compliance_ce",
"bp_compliance_recht",
"bp_compliance_gesetze",
"bp_compliance_datenschutz",
"bp_dsfa_corpus",
@@ -312,6 +312,12 @@ CATEGORY_KEYWORDS = {
"hygiene", "infektionsschutz", "pflege"],
}
VALID_CATEGORIES = set(CATEGORY_KEYWORDS.keys())
VALID_DOMAINS = {"AUTH", "CRYP", "NET", "DATA", "LOG", "ACC", "SEC", "INC",
"AI", "COMP", "GOV", "LAB", "FIN", "TRD", "ENV", "HLT"}
CATEGORY_LIST_STR = ", ".join(sorted(VALID_CATEGORIES))
VERIFICATION_KEYWORDS = {
"code_review": ["source code", "code review", "static analysis", "sast", "dast",
"dependency check", "quellcode", "codeanalyse", "secure coding",
@@ -373,6 +379,7 @@ class GeneratorConfig(BaseModel):
domain: Optional[str] = None
batch_size: int = 5
max_controls: int = 0 # 0 = unlimited (process ALL chunks)
max_chunks: int = 0 # 0 = unlimited; >0 = stop after N chunks (respects document boundaries)
skip_processed: bool = True
skip_web_search: bool = False
dry_run: bool = False
@@ -418,6 +425,7 @@ class GeneratorResult:
controls_needs_review: int = 0
controls_too_close: int = 0
controls_duplicates_found: int = 0
controls_qa_fixed: int = 0
chunks_skipped_prefilter: int = 0
errors: list = field(default_factory=list)
controls: list = field(default_factory=list)
@@ -713,7 +721,7 @@ class ControlGeneratorPipeline:
async def _scan_rag(self, config: GeneratorConfig) -> list[RAGSearchResult]:
"""Scroll through ALL chunks in RAG collections.
Uses the scroll endpoint to iterate over every chunk (not just top-K search).
Uses DIRECT Qdrant scroll API (bypasses Go SDK which has offset cycling bugs).
Filters out already-processed chunks by hash.
"""
collections = config.collections or ALL_COLLECTIONS
@@ -734,80 +742,105 @@ class ControlGeneratorPipeline:
seen_hashes: set[str] = set()
for collection in collections:
offset = None
page = 0
collection_total = 0
collection_new = 0
max_pages = 1000 # Safety limit: 1000 pages × 200 = 200K chunks max per collection
prev_chunk_count = -1 # Track stalls (same count means no progress)
stall_count = 0
qdrant_offset = None # Qdrant uses point ID as offset
while page < max_pages:
chunks, next_offset = await self.rag.scroll(
collection=collection,
offset=offset,
limit=200,
)
while True:
# Direct Qdrant scroll API — bypasses Go SDK offset cycling bug
try:
scroll_body: dict = {
"limit": 250,
"with_payload": True,
"with_vector": False,
}
if qdrant_offset is not None:
scroll_body["offset"] = qdrant_offset
if not chunks:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"{QDRANT_URL}/collections/{collection}/points/scroll",
json=scroll_body,
)
if resp.status_code != 200:
logger.error("Qdrant scroll %s failed: %d %s", collection, resp.status_code, resp.text[:200])
break
data = resp.json().get("result", {})
points = data.get("points", [])
next_page_offset = data.get("next_page_offset")
except Exception as e:
logger.error("Qdrant scroll error for %s: %s", collection, e)
break
collection_total += len(chunks)
if not points:
break
for chunk in chunks:
if not chunk.text or len(chunk.text.strip()) < 50:
continue # Skip empty/tiny chunks
collection_total += len(points)
h = hashlib.sha256(chunk.text.encode()).hexdigest()
for point in points:
payload = point.get("payload", {})
# Different collections use different field names for text
chunk_text = (payload.get("chunk_text", "")
or payload.get("content", "")
or payload.get("text", "")
or payload.get("page_content", ""))
if not chunk_text or len(chunk_text.strip()) < 50:
continue
h = hashlib.sha256(chunk_text.encode()).hexdigest()
# Skip duplicates (same text in multiple collections)
if h in seen_hashes:
continue
seen_hashes.add(h)
# Skip already-processed
if h in processed_hashes:
continue
# Convert Qdrant point to RAGSearchResult
# Handle varying payload schemas across collections
reg_code = (payload.get("regulation_id", "")
or payload.get("regulation_code", "")
or payload.get("source_id", "")
or payload.get("source_code", ""))
reg_name = (payload.get("regulation_name_de", "")
or payload.get("regulation_name", "")
or payload.get("source_name", "")
or payload.get("guideline_name", "")
or payload.get("document_title", "")
or payload.get("filename", ""))
reg_short = (payload.get("regulation_short", "")
or reg_code)
chunk = RAGSearchResult(
text=chunk_text,
regulation_code=reg_code,
regulation_name=reg_name,
regulation_short=reg_short,
category=payload.get("category", "") or payload.get("data_type", ""),
article=payload.get("article", "") or payload.get("section_title", "") or payload.get("section", ""),
paragraph=payload.get("paragraph", ""),
source_url=payload.get("source_url", "") or payload.get("source", "") or payload.get("url", ""),
score=0.0,
collection=collection,
)
all_results.append(chunk)
collection_new += 1
page += 1
if page % 50 == 0:
if page % 100 == 0:
logger.info(
"Scrolling %s: page %d, %d total chunks, %d new unprocessed",
"Scrolling %s (direct Qdrant): page %d, %d total chunks, %d new unprocessed",
collection, page, collection_total, collection_new,
)
# Stop conditions
if not next_offset:
break
if next_page_offset is None:
break # Qdrant returns null when no more pages
# Detect stalls: if no NEW unique chunks found for several pages,
# we've likely cycled through all chunks in this collection.
# (Safer than offset dedup which breaks with mixed Qdrant ID types)
if collection_new == prev_chunk_count:
stall_count += 1
if stall_count >= 5:
logger.warning(
"Scroll stalled in %s at page %d — no new unique chunks for 5 pages (%d total, %d new) — stopping",
collection, page, collection_total, collection_new,
)
break
else:
stall_count = 0
prev_chunk_count = collection_new
offset = next_offset
if page >= max_pages:
logger.warning(
"Collection %s: reached max_pages limit (%d). %d chunks scrolled.",
collection, max_pages, collection_total,
)
qdrant_offset = next_page_offset
logger.info(
"Collection %s: %d total chunks scrolled, %d new unprocessed",
"Collection %s: %d total chunks scrolled (direct Qdrant), %d new unprocessed",
collection, collection_total, collection_new,
)
@@ -857,6 +890,11 @@ Gib JSON zurück mit diesen Feldern:
- evidence: Liste von Nachweisdokumenten (Strings)
- severity: low/medium/high/critical
- tags: Liste von Tags
- domain: Fachgebiet als Kuerzel (AUTH=Authentifizierung, CRYP=Kryptographie, NET=Netzwerk, DATA=Datenschutz, LOG=Logging, ACC=Zugriffskontrolle, SEC=IT-Sicherheit, INC=Vorfallmanagement, AI=KI, COMP=Compliance, GOV=Behoerden/Verwaltung, LAB=Arbeitsrecht, FIN=Finanzregulierung, TRD=Gewerbe/Handelsrecht, ENV=Umwelt, HLT=Gesundheit)
- category: Inhaltliche Kategorie — MUSS zum domain passen. Moegliche Werte: {CATEGORY_LIST_STR}
- target_audience: Liste der Zielgruppen (z.B. "unternehmen", "behoerden", "entwickler", "datenschutzbeauftragte", "geschaeftsfuehrung", "it-abteilung", "rechtsabteilung", "compliance-officer", "personalwesen", "einkauf", "produktion", "gesundheitswesen", "finanzwesen", "oeffentlicher_dienst")
- source_article: Artikel-/Paragraphen-Referenz aus dem Text (z.B. "Artikel 10", "§ 42"). Leer lassen wenn nicht erkennbar.
- source_paragraph: Absatz-Referenz aus dem Text (z.B. "Absatz 5", "Nr. 2"). Leer lassen wenn nicht erkennbar.
Text: {chunk.text[:2000]}
Quelle: {chunk.regulation_name} ({chunk.regulation_code}), {chunk.article}"""
@@ -868,24 +906,29 @@ Quelle: {chunk.regulation_name} ({chunk.regulation_code}), {chunk.article}"""
domain = _detect_domain(chunk.text)
control = self._build_control_from_json(data, domain)
llm_article = str(data.get("source_article", "")).strip()
llm_paragraph = str(data.get("source_paragraph", "")).strip()
effective_article = llm_article or chunk.article or ""
effective_paragraph = llm_paragraph or chunk.paragraph or ""
control.license_rule = 1
control.source_original_text = chunk.text
control.source_citation = {
"source": chunk.regulation_name,
"article": chunk.article or "",
"paragraph": chunk.paragraph or "",
"article": effective_article,
"paragraph": effective_paragraph,
"license": license_info.get("license", ""),
"url": chunk.source_url or "",
}
control.customer_visible = True
control.verification_method = _detect_verification_method(chunk.text)
control.category = _detect_category(chunk.text)
if not control.category:
control.category = _detect_category(chunk.text)
control.generation_metadata = {
"processing_path": "structured",
"license_rule": 1,
"source_regulation": chunk.regulation_code,
"source_article": chunk.article,
"source_paragraph": chunk.paragraph,
"source_article": effective_article,
"source_paragraph": effective_paragraph,
}
return control
@@ -910,6 +953,11 @@ Gib JSON zurück mit diesen Feldern:
- evidence: Liste von Nachweisdokumenten (Strings)
- severity: low/medium/high/critical
- tags: Liste von Tags
- domain: Fachgebiet als Kuerzel (AUTH=Authentifizierung, CRYP=Kryptographie, NET=Netzwerk, DATA=Datenschutz, LOG=Logging, ACC=Zugriffskontrolle, SEC=IT-Sicherheit, INC=Vorfallmanagement, AI=KI, COMP=Compliance, GOV=Behoerden/Verwaltung, LAB=Arbeitsrecht, FIN=Finanzregulierung, TRD=Gewerbe/Handelsrecht, ENV=Umwelt, HLT=Gesundheit)
- category: Inhaltliche Kategorie — MUSS zum domain passen. Moegliche Werte: {CATEGORY_LIST_STR}
- target_audience: Liste der Zielgruppen (z.B. "unternehmen", "behoerden", "entwickler", "datenschutzbeauftragte", "geschaeftsfuehrung", "it-abteilung", "rechtsabteilung", "compliance-officer", "personalwesen", "einkauf", "produktion", "gesundheitswesen", "finanzwesen", "oeffentlicher_dienst")
- source_article: Artikel-/Paragraphen-Referenz aus dem Text (z.B. "Artikel 10", "§ 42"). Leer lassen wenn nicht erkennbar.
- source_paragraph: Absatz-Referenz aus dem Text (z.B. "Absatz 5", "Nr. 2"). Leer lassen wenn nicht erkennbar.
Text: {chunk.text[:2000]}
Quelle: {chunk.regulation_name}, {chunk.article}"""
@@ -921,25 +969,30 @@ Quelle: {chunk.regulation_name}, {chunk.article}"""
domain = _detect_domain(chunk.text)
control = self._build_control_from_json(data, domain)
llm_article = str(data.get("source_article", "")).strip()
llm_paragraph = str(data.get("source_paragraph", "")).strip()
effective_article = llm_article or chunk.article or ""
effective_paragraph = llm_paragraph or chunk.paragraph or ""
control.license_rule = 2
control.source_original_text = chunk.text
control.source_citation = {
"source": chunk.regulation_name,
"article": chunk.article or "",
"paragraph": chunk.paragraph or "",
"article": effective_article,
"paragraph": effective_paragraph,
"license": license_info.get("license", ""),
"license_notice": attribution,
"url": chunk.source_url or "",
}
control.customer_visible = True
control.verification_method = _detect_verification_method(chunk.text)
control.category = _detect_category(chunk.text)
if not control.category:
control.category = _detect_category(chunk.text)
control.generation_metadata = {
"processing_path": "structured",
"license_rule": 2,
"source_regulation": chunk.regulation_code,
"source_article": chunk.article,
"source_paragraph": chunk.paragraph,
"source_article": effective_article,
"source_paragraph": effective_paragraph,
}
return control
@@ -968,7 +1021,8 @@ Gib JSON zurück mit diesen Feldern:
- evidence: Liste von Nachweisdokumenten (Strings)
- severity: low/medium/high/critical
- tags: Liste von Tags (eigene Begriffe)
- domain: Fachgebiet als Kuerzel (AUTH, CRYP, NET, DATA, LOG, ACC, SEC, INC, AI, COMP, GOV, LAB, FIN, TRD, ENV, HLT)
- domain: Fachgebiet als Kuerzel (AUTH=Authentifizierung, CRYP=Kryptographie, NET=Netzwerk, DATA=Datenschutz, LOG=Logging, ACC=Zugriffskontrolle, SEC=IT-Sicherheit, INC=Vorfallmanagement, AI=KI, COMP=Compliance, GOV=Behoerden/Verwaltung, LAB=Arbeitsrecht, FIN=Finanzregulierung, TRD=Gewerbe/Handelsrecht, ENV=Umwelt, HLT=Gesundheit)
- category: Inhaltliche Kategorie — MUSS zum domain passen. Moegliche Werte: {CATEGORY_LIST_STR}
- target_audience: Liste der Zielgruppen (z.B. "unternehmen", "behoerden", "entwickler", "datenschutzbeauftragte", "geschaeftsfuehrung", "it-abteilung", "rechtsabteilung", "compliance-officer", "personalwesen", "oeffentlicher_dienst")"""
raw = await _llm_chat(prompt, REFORM_SYSTEM_PROMPT)
@@ -982,7 +1036,8 @@ Gib JSON zurück mit diesen Feldern:
control.source_citation = None # NEVER cite source
control.customer_visible = False # Only our formulation
control.verification_method = _detect_verification_method(chunk.text)
control.category = _detect_category(chunk.text)
if not control.category:
control.category = _detect_category(chunk.text)
# generation_metadata: NO source names, NO original texts
control.generation_metadata = {
"processing_path": "llm_reform",
@@ -1046,7 +1101,10 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
- severity: low/medium/high/critical
- tags: Liste von Tags
- domain: Fachgebiet als Kuerzel (AUTH=Authentifizierung, CRYP=Kryptographie, NET=Netzwerk, DATA=Datenschutz, LOG=Logging, ACC=Zugriffskontrolle, SEC=IT-Sicherheit, INC=Vorfallmanagement, AI=KI, COMP=Compliance, GOV=Behoerden/Verwaltung, LAB=Arbeitsrecht, FIN=Finanzregulierung, TRD=Gewerbe/Handelsrecht, ENV=Umwelt, HLT=Gesundheit)
- category: Inhaltliche Kategorie — MUSS zum domain passen. Moegliche Werte: {CATEGORY_LIST_STR}
- target_audience: Liste der Zielgruppen fuer die dieses Control relevant ist. Moegliche Werte: "unternehmen", "behoerden", "entwickler", "datenschutzbeauftragte", "geschaeftsfuehrung", "it-abteilung", "rechtsabteilung", "compliance-officer", "personalwesen", "einkauf", "produktion", "vertrieb", "gesundheitswesen", "finanzwesen", "oeffentlicher_dienst"
- source_article: Artikel-/Paragraphen-Referenz aus dem Text extrahieren (z.B. "Artikel 10", "Art. 5", "§ 42", "Section 3"). Leer lassen wenn nicht erkennbar.
- source_paragraph: Absatz-Referenz aus dem Text extrahieren (z.B. "Absatz 5", "Abs. 3", "Nr. 2", "(1)"). Leer lassen wenn nicht erkennbar.
{joined}"""
@@ -1071,26 +1129,32 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
domain = _detect_domain(chunk.text)
control = self._build_control_from_json(data, domain)
control.license_rule = lic["rule"]
# Use LLM-extracted article/paragraph, fall back to chunk metadata
llm_article = str(data.get("source_article", "")).strip()
llm_paragraph = str(data.get("source_paragraph", "")).strip()
effective_article = llm_article or chunk.article or ""
effective_paragraph = llm_paragraph or chunk.paragraph or ""
if lic["rule"] in (1, 2):
control.source_original_text = chunk.text
control.source_citation = {
"source": chunk.regulation_name,
"article": chunk.article or "",
"paragraph": chunk.paragraph or "",
"article": effective_article,
"paragraph": effective_paragraph,
"license": lic.get("license", ""),
"license_notice": lic.get("attribution", ""),
"url": chunk.source_url or "",
}
control.customer_visible = True
control.verification_method = _detect_verification_method(chunk.text)
control.category = _detect_category(chunk.text)
if not control.category:
control.category = _detect_category(chunk.text)
same_doc = len(set(c.regulation_code for c in chunks)) == 1
control.generation_metadata = {
"processing_path": "structured_batch",
"license_rule": lic["rule"],
"source_regulation": chunk.regulation_code,
"source_article": chunk.article,
"source_paragraph": chunk.paragraph,
"source_article": effective_article,
"source_paragraph": effective_paragraph,
"batch_size": len(chunks),
"document_grouped": same_doc,
}
@@ -1133,6 +1197,7 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
- severity: low/medium/high/critical
- tags: Liste von Tags (eigene Begriffe)
- domain: Fachgebiet als Kuerzel (AUTH=Authentifizierung, CRYP=Kryptographie, NET=Netzwerk, DATA=Datenschutz, LOG=Logging, ACC=Zugriffskontrolle, SEC=IT-Sicherheit, INC=Vorfallmanagement, AI=KI, COMP=Compliance, GOV=Behoerden/Verwaltung, LAB=Arbeitsrecht, FIN=Finanzregulierung, TRD=Gewerbe/Handelsrecht, ENV=Umwelt, HLT=Gesundheit)
- category: Inhaltliche Kategorie — MUSS zum domain passen. Moegliche Werte: {CATEGORY_LIST_STR}
- target_audience: Liste der Zielgruppen (z.B. "unternehmen", "behoerden", "entwickler", "datenschutzbeauftragte", "geschaeftsfuehrung", "it-abteilung", "rechtsabteilung", "compliance-officer", "personalwesen", "einkauf", "produktion", "gesundheitswesen", "finanzwesen", "oeffentlicher_dienst")
{joined}"""
@@ -1159,7 +1224,8 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
control.source_citation = None
control.customer_visible = False
control.verification_method = _detect_verification_method(chunk.text)
control.category = _detect_category(chunk.text)
if not control.category:
control.category = _detect_category(chunk.text)
control.generation_metadata = {
"processing_path": "llm_reform_batch",
"license_rule": 3,
@@ -1209,6 +1275,9 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
all_controls[orig_idx] = ctrl
# Post-process all controls: harmonization + anchor search
# NOTE: QA validation runs as a separate batch AFTER generation (qa-reclassify endpoint)
# to avoid competing with Ollama prefilter for resources.
qa_fixed_count = 0
final: list[Optional[GeneratedControl]] = []
for i in range(len(batch_items)):
control = all_controls.get(i)
@@ -1245,7 +1314,7 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
else:
control.release_state = "needs_review"
# Control ID — prefer LLM-assigned domain over keyword detection
# Control ID — prefer QA-corrected or LLM-assigned domain over keyword detection
domain = (control.generation_metadata.get("_effective_domain")
or config.domain
or _detect_domain(control.objective))
@@ -1254,7 +1323,9 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
final.append(control)
return final
if qa_fixed_count:
logger.info("QA validation: fixed %d/%d controls in batch", qa_fixed_count, len(final))
return final, qa_fixed_count
# ── Stage 4: Harmonization ─────────────────────────────────────────
@@ -1337,11 +1408,15 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
# Use LLM-provided domain if available, fallback to keyword-detected domain
llm_domain = data.get("domain")
valid_domains = {"AUTH", "CRYP", "NET", "DATA", "LOG", "ACC", "SEC", "INC",
"AI", "COMP", "GOV", "LAB", "FIN", "TRD", "ENV", "HLT"}
if llm_domain and llm_domain.upper() in valid_domains:
if llm_domain and llm_domain.upper() in VALID_DOMAINS:
domain = llm_domain.upper()
# Use LLM-provided category if available
llm_category = data.get("category")
category = None
if llm_category and llm_category in VALID_CATEGORIES:
category = llm_category
# Parse target_audience from LLM response
target_audience = data.get("target_audience")
if isinstance(target_audience, str):
@@ -1362,6 +1437,7 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
implementation_effort=data.get("implementation_effort", "m") if data.get("implementation_effort") in ("s", "m", "l", "xl") else "m",
tags=tags[:20],
target_audience=target_audience,
category=category,
)
# Store effective domain for later control_id generation
control.generation_metadata["_effective_domain"] = domain
@@ -1395,6 +1471,79 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
pass
return f"{prefix}-001"
# ── Stage QA: Automated Quality Validation ───────────────────────
async def _qa_validate_control(
self, control: GeneratedControl, chunk_text: str
) -> tuple[GeneratedControl, bool]:
"""Cross-validate category/domain using keyword detection + local LLM.
Returns (control, was_fixed). Only triggers Ollama QA when the LLM
classification disagrees with keyword detection — keeps it fast.
"""
kw_category = _detect_category(chunk_text) or _detect_category(control.objective)
kw_domain = _detect_domain(chunk_text)
llm_domain = control.generation_metadata.get("_effective_domain", "")
# If keyword and LLM agree → no QA needed
if control.category == kw_category and llm_domain == kw_domain:
return control, False
# Disagreement detected → ask local LLM to arbitrate
title = control.title[:100]
objective = control.objective[:200]
reqs = ", ".join(control.requirements[:3]) if control.requirements else "keine"
prompt = f"""Pruefe dieses Compliance-Control auf korrekte Klassifizierung.
Titel: {title}
Ziel: {objective}
Anforderungen: {reqs}
Aktuelle Zuordnung: domain={llm_domain}, category={control.category}
Keyword-Erkennung: domain={kw_domain}, category={kw_category}
Welche Zuordnung ist korrekt? Antworte NUR als JSON:
{{"domain": "KUERZEL", "category": "kategorie_name", "reason": "kurze Begruendung"}}
Domains: AUTH=Authentifizierung, CRYP=Kryptographie, NET=Netzwerk, DATA=Datenschutz, LOG=Logging, ACC=Zugriffskontrolle, SEC=IT-Sicherheit, INC=Vorfallmanagement, AI=KI, COMP=Compliance, GOV=Behoerden, LAB=Arbeitsrecht, FIN=Finanzregulierung, TRD=Gewerbe, ENV=Umwelt, HLT=Gesundheit
Kategorien: {CATEGORY_LIST_STR}"""
try:
raw = await _llm_local(prompt)
data = _parse_llm_json(raw)
if not data:
return control, False
fixed = False
qa_domain = data.get("domain", "").upper()
qa_category = data.get("category", "")
reason = data.get("reason", "")
if qa_category and qa_category in VALID_CATEGORIES and qa_category != control.category:
old_cat = control.category
control.category = qa_category
control.generation_metadata["qa_category_fix"] = {
"from": old_cat, "to": qa_category, "reason": reason,
}
logger.info("QA fix: '%s' category '%s' -> '%s' (%s)",
title[:40], old_cat, qa_category, reason)
fixed = True
if qa_domain and qa_domain in VALID_DOMAINS and qa_domain != llm_domain:
control.generation_metadata["qa_domain_fix"] = {
"from": llm_domain, "to": qa_domain, "reason": reason,
}
control.generation_metadata["_effective_domain"] = qa_domain
logger.info("QA fix: '%s' domain '%s' -> '%s' (%s)",
title[:40], llm_domain, qa_domain, reason)
fixed = True
return control, fixed
except Exception as e:
logger.warning("QA validation failed for '%s': %s", title[:40], e)
return control, False
# ── Pipeline Orchestration ─────────────────────────────────────────
def _create_job(self, config: GeneratorConfig) -> str:
@@ -1605,10 +1754,28 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
len(chunks), len(doc_groups),
)
# Flatten back: chunks from same document are now adjacent
# ── Apply max_chunks limit respecting document boundaries ──
# Process complete documents until we exceed the limit.
# Never split a document across jobs.
chunks = []
for group_list in doc_groups.values():
chunks.extend(group_list)
if config.max_chunks and config.max_chunks > 0:
for group_key, group_list in doc_groups.items():
if chunks and len(chunks) + len(group_list) > config.max_chunks:
# Adding this document would exceed the limit — stop here
break
chunks.extend(group_list)
logger.info(
"max_chunks=%d: selected %d chunks from %d complete documents (of %d total groups)",
config.max_chunks, len(chunks),
len(set(c.regulation_code for c in chunks)),
len(doc_groups),
)
else:
# No limit: flatten all groups
for group_list in doc_groups.values():
chunks.extend(group_list)
result.total_chunks_scanned = len(chunks)
# Process chunks — batch mode (N chunks per Anthropic API call)
BATCH_SIZE = config.batch_size or 5
@@ -1633,7 +1800,8 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
len(batch), ", ".join(regs_in_batch),
)
try:
batch_controls = await self._process_batch(batch, config, job_id)
batch_controls, batch_qa_fixes = await self._process_batch(batch, config, job_id)
result.controls_qa_fixed += batch_qa_fixes
except Exception as e:
logger.error("Batch processing failed: %s — falling back to single-chunk mode", e)
# Fallback: process each chunk individually
@@ -1785,6 +1953,8 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
if not control.title or not control.objective:
return None
# NOTE: QA validation runs as a separate batch AFTER generation (qa-reclassify endpoint)
# Stage 4: Harmonization
duplicates = await self._check_harmonization(control)
if duplicates:
@@ -1809,8 +1979,10 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
else:
control.release_state = "needs_review"
# Generate control_id
domain = config.domain or _detect_domain(control.objective)
# Generate control_id — prefer QA-corrected or LLM-assigned domain
domain = (control.generation_metadata.get("_effective_domain")
or config.domain
or _detect_domain(control.objective))
control.control_id = self._generate_control_id(domain, self.db)
# Store job_id in metadata