feat(iace): add hazard-matching-engine with component library, tag system, and pattern engine
All checks were successful
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 44s
CI/CD / test-python-backend-compliance (push) Successful in 33s
CI/CD / test-python-document-crawler (push) Successful in 22s
CI/CD / test-python-dsms-gateway (push) Successful in 19s
CI/CD / validate-canonical-controls (push) Successful in 13s
CI/CD / Deploy (push) Successful in 4s

Implements Phases 1-4 of the IACE Hazard-Matching-Engine:
- 120 machine components (C001-C120) in 11 categories
- 20 energy sources (EN01-EN20)
- ~85 tag taxonomy across 5 domains
- 44 hazard patterns with AND/NOT matching logic
- Pattern engine with tag resolution and confidence scoring
- 8 new API endpoints (component-library, energy-sources, tags, patterns, match/apply)
- Completeness gate G09 for pattern matching
- 320 tests passing (36 new)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-16 08:50:11 +01:00
parent c7651796c9
commit 3b2006ebce
21 changed files with 4993 additions and 41 deletions

View File

@@ -27,6 +27,7 @@ from compliance.services.control_generator import (
GeneratorConfig,
ALL_COLLECTIONS,
)
from compliance.services.citation_backfill import CitationBackfill, BackfillResult
from compliance.services.rag_client import get_rag_client
logger = logging.getLogger(__name__)
@@ -496,3 +497,288 @@ async def get_controls_customer_view(
return {"controls": controls, "total": len(controls)}
finally:
db.close()
# =============================================================================
# CITATION BACKFILL
# =============================================================================
class BackfillRequest(BaseModel):
dry_run: bool = True # Default to dry_run for safety
limit: int = 0 # 0 = all controls
class BackfillResponse(BaseModel):
status: str
total_controls: int = 0
matched_hash: int = 0
matched_regex: int = 0
matched_llm: int = 0
unmatched: int = 0
updated: int = 0
errors: list = []
_backfill_status: dict = {}
async def _run_backfill_background(dry_run: bool, limit: int, backfill_id: str):
"""Run backfill in background with own DB session."""
db = SessionLocal()
try:
backfill = CitationBackfill(db=db, rag_client=get_rag_client())
result = await backfill.run(dry_run=dry_run, limit=limit)
_backfill_status[backfill_id] = {
"status": "completed",
"total_controls": result.total_controls,
"matched_hash": result.matched_hash,
"matched_regex": result.matched_regex,
"matched_llm": result.matched_llm,
"unmatched": result.unmatched,
"updated": result.updated,
"errors": result.errors[:50],
}
logger.info("Backfill %s completed: %d updated", backfill_id, result.updated)
except Exception as e:
logger.error("Backfill %s failed: %s", backfill_id, e)
_backfill_status[backfill_id] = {"status": "failed", "errors": [str(e)]}
finally:
db.close()
@router.post("/generate/backfill-citations", response_model=BackfillResponse)
async def start_backfill(req: BackfillRequest):
"""Backfill article/paragraph into existing control source_citations.
Uses 3-tier matching: hash lookup → regex parse → Ollama LLM.
Default is dry_run=True (preview only, no DB changes).
"""
import uuid
backfill_id = str(uuid.uuid4())[:8]
_backfill_status[backfill_id] = {"status": "running"}
# Always run in background (RAG index build takes minutes)
asyncio.create_task(_run_backfill_background(req.dry_run, req.limit, backfill_id))
return BackfillResponse(
status=f"running (id={backfill_id})",
)
@router.get("/generate/backfill-status/{backfill_id}")
async def get_backfill_status(backfill_id: str):
"""Get status of a backfill job."""
status = _backfill_status.get(backfill_id)
if not status:
raise HTTPException(status_code=404, detail="Backfill job not found")
return status
# =============================================================================
# DOMAIN + TARGET AUDIENCE BACKFILL
# =============================================================================
class DomainBackfillRequest(BaseModel):
dry_run: bool = True
job_id: Optional[str] = None # Only backfill controls from this job
limit: int = 0 # 0 = all
_domain_backfill_status: dict = {}
async def _run_domain_backfill(req: DomainBackfillRequest, backfill_id: str):
"""Backfill domain, category, and target_audience for existing controls using Anthropic."""
import os
import httpx
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
ANTHROPIC_MODEL = os.getenv("CONTROL_GEN_ANTHROPIC_MODEL", "claude-sonnet-4-6")
if not ANTHROPIC_API_KEY:
_domain_backfill_status[backfill_id] = {
"status": "failed", "error": "ANTHROPIC_API_KEY not set"
}
return
db = SessionLocal()
try:
# Find controls needing backfill
where_clauses = ["(target_audience IS NULL OR target_audience = '[]' OR target_audience = 'null')"]
params: dict = {}
if req.job_id:
where_clauses.append("generation_metadata->>'job_id' = :job_id")
params["job_id"] = req.job_id
query = f"""
SELECT id, control_id, title, objective, category, source_original_text, tags
FROM canonical_controls
WHERE {' AND '.join(where_clauses)}
ORDER BY control_id
"""
if req.limit > 0:
query += f" LIMIT {req.limit}"
result = db.execute(text(query), params)
controls = [dict(zip(result.keys(), row)) for row in result]
total = len(controls)
updated = 0
errors = []
_domain_backfill_status[backfill_id] = {
"status": "running", "total": total, "updated": 0, "errors": []
}
# Process in batches of 10
BATCH_SIZE = 10
for batch_start in range(0, total, BATCH_SIZE):
batch = controls[batch_start:batch_start + BATCH_SIZE]
entries = []
for idx, ctrl in enumerate(batch):
text_for_analysis = ctrl.get("objective") or ctrl.get("title") or ""
original = ctrl.get("source_original_text") or ""
if original:
text_for_analysis += f"\n\nQuelltext-Auszug: {original[:500]}"
entries.append(
f"--- CONTROL {idx + 1}: {ctrl['control_id']} ---\n"
f"Titel: {ctrl.get('title', '')}\n"
f"Objective: {text_for_analysis[:800]}\n"
f"Tags: {json.dumps(ctrl.get('tags', []))}"
)
prompt = f"""Analysiere die folgenden {len(batch)} Controls und bestimme fuer jedes:
1. domain: Das Fachgebiet (AUTH, CRYP, NET, DATA, LOG, ACC, SEC, INC, AI, COMP, GOV, LAB, FIN, TRD, ENV, HLT)
2. category: Die Kategorie (encryption, authentication, network, data_protection, logging, incident, continuity, compliance, supply_chain, physical, personnel, application, system, risk, governance, hardware, identity, public_administration, labor_law, finance, trade_regulation, environmental, health)
3. target_audience: Liste der Zielgruppen (moegliche Werte: "unternehmen", "behoerden", "entwickler", "datenschutzbeauftragte", "geschaeftsfuehrung", "it-abteilung", "rechtsabteilung", "compliance-officer", "personalwesen", "einkauf", "produktion", "vertrieb", "gesundheitswesen", "finanzwesen", "oeffentlicher_dienst")
Antworte mit einem JSON-Array mit {len(batch)} Objekten. Jedes Objekt hat:
- control_index: 1-basierter Index
- domain: Fachgebiet-Kuerzel
- category: Kategorie
- target_audience: Liste der Zielgruppen
{"".join(entries)}"""
try:
headers = {
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
}
payload = {
"model": ANTHROPIC_MODEL,
"max_tokens": 4096,
"system": "Du bist ein Compliance-Experte. Klassifiziere Controls nach Fachgebiet und Zielgruppe. Antworte NUR mit validem JSON.",
"messages": [{"role": "user", "content": prompt}],
}
async with httpx.AsyncClient(timeout=60.0) as client:
resp = await client.post(
"https://api.anthropic.com/v1/messages",
headers=headers,
json=payload,
)
if resp.status_code != 200:
errors.append(f"Anthropic API {resp.status_code} at batch {batch_start}")
continue
raw = resp.json().get("content", [{}])[0].get("text", "")
# Parse response
import re
bracket_match = re.search(r"\[.*\]", raw, re.DOTALL)
if not bracket_match:
errors.append(f"No JSON array in response at batch {batch_start}")
continue
results_list = json.loads(bracket_match.group(0))
for item in results_list:
idx = item.get("control_index", 0) - 1
if idx < 0 or idx >= len(batch):
continue
ctrl = batch[idx]
ctrl_id = str(ctrl["id"])
new_domain = item.get("domain", "")
new_category = item.get("category", "")
new_audience = item.get("target_audience", [])
if not isinstance(new_audience, list):
new_audience = []
# Build new control_id from domain if domain changed
old_prefix = ctrl["control_id"].split("-")[0] if ctrl["control_id"] else ""
new_prefix = new_domain.upper()[:4] if new_domain else old_prefix
if not req.dry_run:
update_parts = []
update_params: dict = {"ctrl_id": ctrl_id}
if new_category:
update_parts.append("category = :category")
update_params["category"] = new_category
if new_audience:
update_parts.append("target_audience = :target_audience")
update_params["target_audience"] = json.dumps(new_audience)
# Note: We do NOT rename control_ids here — that would
# break references and cause unique constraint violations.
if update_parts:
update_parts.append("updated_at = NOW()")
db.execute(
text(f"UPDATE canonical_controls SET {', '.join(update_parts)} WHERE id = CAST(:ctrl_id AS uuid)"),
update_params,
)
updated += 1
if not req.dry_run:
db.commit()
except Exception as e:
errors.append(f"Batch {batch_start}: {str(e)}")
db.rollback()
_domain_backfill_status[backfill_id] = {
"status": "running", "total": total, "updated": updated,
"progress": f"{min(batch_start + BATCH_SIZE, total)}/{total}",
"errors": errors[-10:],
}
_domain_backfill_status[backfill_id] = {
"status": "completed", "total": total, "updated": updated,
"errors": errors[-50:],
}
logger.info("Domain backfill %s completed: %d/%d updated", backfill_id, updated, total)
except Exception as e:
logger.error("Domain backfill %s failed: %s", backfill_id, e)
_domain_backfill_status[backfill_id] = {"status": "failed", "error": str(e)}
finally:
db.close()
@router.post("/generate/backfill-domain")
async def start_domain_backfill(req: DomainBackfillRequest):
"""Backfill domain, category, and target_audience for controls using Anthropic API.
Finds controls where target_audience is NULL and enriches them.
Default is dry_run=True (preview only).
"""
import uuid
backfill_id = str(uuid.uuid4())[:8]
_domain_backfill_status[backfill_id] = {"status": "starting"}
asyncio.create_task(_run_domain_backfill(req, backfill_id))
return {"status": "running", "backfill_id": backfill_id,
"message": f"Domain backfill started. Poll /generate/backfill-status/{backfill_id}"}
@router.get("/generate/domain-backfill-status/{backfill_id}")
async def get_domain_backfill_status(backfill_id: str):
"""Get status of a domain backfill job."""
status = _domain_backfill_status.get(backfill_id)
if not status:
raise HTTPException(status_code=404, detail="Domain backfill job not found")
return status

View File

@@ -223,13 +223,13 @@ def _classify_regulation(regulation_code: str) -> dict:
DOMAIN_KEYWORDS = {
"AUTH": ["authentication", "login", "password", "credential", "mfa", "2fa",
"session", "token", "oauth", "identity", "authentifizierung", "anmeldung"],
"CRYPT": ["encryption", "cryptography", "tls", "ssl", "certificate", "hashing",
"aes", "rsa", "verschlüsselung", "kryptographie", "zertifikat"],
"CRYP": ["encryption", "cryptography", "tls", "ssl", "certificate", "hashing",
"aes", "rsa", "verschlüsselung", "kryptographie", "cipher", "schlüssel"],
"NET": ["network", "firewall", "dns", "vpn", "proxy", "segmentation",
"netzwerk", "routing", "port", "intrusion"],
"DATA": ["data protection", "privacy", "personal data", "datenschutz",
"personenbezogen", "dsgvo", "gdpr", "löschung", "verarbeitung"],
"LOG": ["logging", "monitoring", "audit", "siem", "alert", "anomaly",
"LOG": ["logging", "monitoring", "audit trail", "siem", "alert", "anomaly",
"protokollierung", "überwachung"],
"ACC": ["access control", "authorization", "rbac", "permission", "privilege",
"zugriffskontrolle", "berechtigung", "autorisierung"],
@@ -241,12 +241,30 @@ DOMAIN_KEYWORDS = {
"ki", "künstliche intelligenz", "algorithmus", "training"],
"COMP": ["compliance", "audit", "regulation", "standard", "certification",
"konformität", "prüfung", "zertifizierung"],
"GOV": ["behörde", "verwaltung", "öffentlich", "register", "gewerberegister",
"handelsregister", "meldepflicht", "aufsicht", "genehmigung", "bescheid",
"verwaltungsakt", "ordnungswidrig", "bußgeld", "staat", "ministerium",
"bundesamt", "landesamt", "kommune", "gebietskörperschaft"],
"LAB": ["arbeitnehmer", "arbeitgeber", "arbeitsschutz", "arbeitszeit", "betriebsrat",
"kündigung", "beschäftigung", "mindestlohn", "arbeitsvertrag", "betriebsverfassung",
"arbeitsrecht", "arbeitsstätte", "gefährdungsbeurteilung", "unterweisung"],
"FIN": ["finanz", "bankwesen", "zahlungsverkehr", "geldwäsche", "bilanz", "rechnungslegung",
"buchführung", "jahresabschluss", "steuererklärung", "kapitalmarkt", "wertpapier",
"kreditinstitut", "finanzdienstleistung", "bankenaufsicht", "bafin"],
"TRD": ["handelsrecht", "gewerbeordnung", "gewerbe", "handwerk", "gewerbeuntersagung",
"gewerbebetrieb", "handelsgesetzbuch", "handelsregister", "kaufmann",
"unternehmer", "wettbewerb", "verbraucherschutz", "produktsicherheit"],
"ENV": ["umweltschutz", "emission", "abfall", "immission", "gewässerschutz",
"naturschutz", "umweltverträglichkeit", "klimaschutz", "nachhaltigkeit",
"entsorgung", "recycling", "umweltrecht"],
"HLT": ["gesundheit", "medizinprodukt", "arzneimittel", "patient", "krankenhaus",
"hygiene", "infektionsschutz", "medizin", "pflege", "therapie"],
}
CATEGORY_KEYWORDS = {
"encryption": ["encryption", "cryptography", "tls", "ssl", "certificate", "hashing",
"aes", "rsa", "verschlüsselung", "kryptographie", "zertifikat", "cipher"],
"aes", "rsa", "verschlüsselung", "kryptographie", "cipher", "schlüssel"],
"authentication": ["authentication", "login", "password", "credential", "mfa", "2fa",
"session", "oauth", "authentifizierung", "anmeldung", "passwort"],
"network": ["network", "firewall", "dns", "vpn", "proxy", "segmentation",
@@ -278,6 +296,20 @@ CATEGORY_KEYWORDS = {
"plattform", "geräte"],
"identity": ["identity", "iam", "directory", "ldap", "sso", "provisioning",
"identität", "identitätsmanagement", "benutzerverzeichnis"],
"public_administration": ["behörde", "verwaltung", "öffentlich", "register", "gewerberegister",
"handelsregister", "meldepflicht", "aufsicht", "genehmigung", "bescheid",
"verwaltungsakt", "ordnungswidrig", "bußgeld", "amt"],
"labor_law": ["arbeitnehmer", "arbeitgeber", "arbeitsschutz", "arbeitszeit", "betriebsrat",
"kündigung", "beschäftigung", "mindestlohn", "arbeitsvertrag", "betriebsverfassung"],
"finance": ["finanz", "bankwesen", "zahlungsverkehr", "geldwäsche", "bilanz", "rechnungslegung",
"buchführung", "jahresabschluss", "kapitalmarkt", "wertpapier", "bafin"],
"trade_regulation": ["gewerbeordnung", "gewerbe", "handwerk", "gewerbeuntersagung",
"gewerbebetrieb", "handelsrecht", "kaufmann", "wettbewerb",
"verbraucherschutz", "produktsicherheit"],
"environmental": ["umweltschutz", "emission", "abfall", "immission", "gewässerschutz",
"naturschutz", "klimaschutz", "nachhaltigkeit", "entsorgung"],
"health": ["gesundheit", "medizinprodukt", "arzneimittel", "patient", "krankenhaus",
"hygiene", "infektionsschutz", "pflege"],
}
VERIFICATION_KEYWORDS = {
@@ -372,7 +404,8 @@ class GeneratedControl:
generation_strategy: str = "ungrouped" # ungrouped | document_grouped
# Classification fields
verification_method: Optional[str] = None # code_review, document, tool, hybrid
category: Optional[str] = None # one of 17 categories
category: Optional[str] = None # one of 22 categories
target_audience: Optional[list] = None # e.g. ["unternehmen", "behoerden", "entwickler"]
@dataclass
@@ -705,9 +738,11 @@ class ControlGeneratorPipeline:
page = 0
collection_total = 0
collection_new = 0
seen_offsets: set[str] = set() # Detect scroll loops
max_pages = 1000 # Safety limit: 1000 pages × 200 = 200K chunks max per collection
prev_chunk_count = -1 # Track stalls (same count means no progress)
stall_count = 0
while True:
while page < max_pages:
chunks, next_offset = await self.rag.scroll(
collection=collection,
offset=offset,
@@ -747,17 +782,30 @@ class ControlGeneratorPipeline:
# Stop conditions
if not next_offset:
break
# Detect infinite scroll loops (Qdrant mixed ID types)
if next_offset in seen_offsets:
logger.warning(
"Scroll loop detected in %s at offset %s (page %d) — stopping",
collection, next_offset, page,
)
break
seen_offsets.add(next_offset)
# Detect stalls: if no NEW unique chunks found for several pages,
# we've likely cycled through all chunks in this collection.
# (Safer than offset dedup which breaks with mixed Qdrant ID types)
if collection_new == prev_chunk_count:
stall_count += 1
if stall_count >= 5:
logger.warning(
"Scroll stalled in %s at page %d — no new unique chunks for 5 pages (%d total, %d new) — stopping",
collection, page, collection_total, collection_new,
)
break
else:
stall_count = 0
prev_chunk_count = collection_new
offset = next_offset
if page >= max_pages:
logger.warning(
"Collection %s: reached max_pages limit (%d). %d chunks scrolled.",
collection, max_pages, collection_total,
)
logger.info(
"Collection %s: %d total chunks scrolled, %d new unprocessed",
collection, collection_total, collection_new,
@@ -823,7 +871,9 @@ Quelle: {chunk.regulation_name} ({chunk.regulation_code}), {chunk.article}"""
control.license_rule = 1
control.source_original_text = chunk.text
control.source_citation = {
"source": f"{chunk.regulation_name} {chunk.article or ''}".strip(),
"source": chunk.regulation_name,
"article": chunk.article or "",
"paragraph": chunk.paragraph or "",
"license": license_info.get("license", ""),
"url": chunk.source_url or "",
}
@@ -835,6 +885,7 @@ Quelle: {chunk.regulation_name} ({chunk.regulation_code}), {chunk.article}"""
"license_rule": 1,
"source_regulation": chunk.regulation_code,
"source_article": chunk.article,
"source_paragraph": chunk.paragraph,
}
return control
@@ -873,7 +924,9 @@ Quelle: {chunk.regulation_name}, {chunk.article}"""
control.license_rule = 2
control.source_original_text = chunk.text
control.source_citation = {
"source": f"{chunk.regulation_name} {chunk.article or ''}".strip(),
"source": chunk.regulation_name,
"article": chunk.article or "",
"paragraph": chunk.paragraph or "",
"license": license_info.get("license", ""),
"license_notice": attribution,
"url": chunk.source_url or "",
@@ -886,6 +939,7 @@ Quelle: {chunk.regulation_name}, {chunk.article}"""
"license_rule": 2,
"source_regulation": chunk.regulation_code,
"source_article": chunk.article,
"source_paragraph": chunk.paragraph,
}
return control
@@ -913,7 +967,9 @@ Gib JSON zurück mit diesen Feldern:
- test_procedure: Liste von Prüfschritten (Strings)
- evidence: Liste von Nachweisdokumenten (Strings)
- severity: low/medium/high/critical
- tags: Liste von Tags (eigene Begriffe)"""
- tags: Liste von Tags (eigene Begriffe)
- domain: Fachgebiet als Kuerzel (AUTH, CRYP, NET, DATA, LOG, ACC, SEC, INC, AI, COMP, GOV, LAB, FIN, TRD, ENV, HLT)
- target_audience: Liste der Zielgruppen (z.B. "unternehmen", "behoerden", "entwickler", "datenschutzbeauftragte", "geschaeftsfuehrung", "it-abteilung", "rechtsabteilung", "compliance-officer", "personalwesen", "oeffentlicher_dienst")"""
raw = await _llm_chat(prompt, REFORM_SYSTEM_PROMPT)
data = _parse_llm_json(raw)
@@ -989,6 +1045,8 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
- evidence: Liste von Nachweisdokumenten (Strings, Deutsch)
- severity: low/medium/high/critical
- tags: Liste von Tags
- domain: Fachgebiet als Kuerzel (AUTH=Authentifizierung, CRYP=Kryptographie, NET=Netzwerk, DATA=Datenschutz, LOG=Logging, ACC=Zugriffskontrolle, SEC=IT-Sicherheit, INC=Vorfallmanagement, AI=KI, COMP=Compliance, GOV=Behoerden/Verwaltung, LAB=Arbeitsrecht, FIN=Finanzregulierung, TRD=Gewerbe/Handelsrecht, ENV=Umwelt, HLT=Gesundheit)
- target_audience: Liste der Zielgruppen fuer die dieses Control relevant ist. Moegliche Werte: "unternehmen", "behoerden", "entwickler", "datenschutzbeauftragte", "geschaeftsfuehrung", "it-abteilung", "rechtsabteilung", "compliance-officer", "personalwesen", "einkauf", "produktion", "vertrieb", "gesundheitswesen", "finanzwesen", "oeffentlicher_dienst"
{joined}"""
@@ -1016,7 +1074,9 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
if lic["rule"] in (1, 2):
control.source_original_text = chunk.text
control.source_citation = {
"source": f"{chunk.regulation_name} {chunk.article or ''}".strip(),
"source": chunk.regulation_name,
"article": chunk.article or "",
"paragraph": chunk.paragraph or "",
"license": lic.get("license", ""),
"license_notice": lic.get("attribution", ""),
"url": chunk.source_url or "",
@@ -1030,6 +1090,7 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
"license_rule": lic["rule"],
"source_regulation": chunk.regulation_code,
"source_article": chunk.article,
"source_paragraph": chunk.paragraph,
"batch_size": len(chunks),
"document_grouped": same_doc,
}
@@ -1071,6 +1132,8 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
- evidence: Liste von Nachweisdokumenten (Strings)
- severity: low/medium/high/critical
- tags: Liste von Tags (eigene Begriffe)
- domain: Fachgebiet als Kuerzel (AUTH=Authentifizierung, CRYP=Kryptographie, NET=Netzwerk, DATA=Datenschutz, LOG=Logging, ACC=Zugriffskontrolle, SEC=IT-Sicherheit, INC=Vorfallmanagement, AI=KI, COMP=Compliance, GOV=Behoerden/Verwaltung, LAB=Arbeitsrecht, FIN=Finanzregulierung, TRD=Gewerbe/Handelsrecht, ENV=Umwelt, HLT=Gesundheit)
- target_audience: Liste der Zielgruppen (z.B. "unternehmen", "behoerden", "entwickler", "datenschutzbeauftragte", "geschaeftsfuehrung", "it-abteilung", "rechtsabteilung", "compliance-officer", "personalwesen", "einkauf", "produktion", "gesundheitswesen", "finanzwesen", "oeffentlicher_dienst")
{joined}"""
@@ -1182,8 +1245,10 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
else:
control.release_state = "needs_review"
# Control ID
domain = config.domain or _detect_domain(control.objective)
# Control ID — prefer LLM-assigned domain over keyword detection
domain = (control.generation_metadata.get("_effective_domain")
or config.domain
or _detect_domain(control.objective))
control.control_id = self._generate_control_id(domain, self.db)
control.generation_metadata["job_id"] = job_id
@@ -1270,7 +1335,21 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
if isinstance(tags, str):
tags = [t.strip() for t in tags.split(",")]
return GeneratedControl(
# Use LLM-provided domain if available, fallback to keyword-detected domain
llm_domain = data.get("domain")
valid_domains = {"AUTH", "CRYP", "NET", "DATA", "LOG", "ACC", "SEC", "INC",
"AI", "COMP", "GOV", "LAB", "FIN", "TRD", "ENV", "HLT"}
if llm_domain and llm_domain.upper() in valid_domains:
domain = llm_domain.upper()
# Parse target_audience from LLM response
target_audience = data.get("target_audience")
if isinstance(target_audience, str):
target_audience = [t.strip() for t in target_audience.split(",")]
if not isinstance(target_audience, list):
target_audience = None
control = GeneratedControl(
title=str(data.get("title", "Untitled Control"))[:255],
objective=str(data.get("objective", "")),
rationale=str(data.get("rationale", "")),
@@ -1282,7 +1361,11 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
risk_score=min(10.0, max(0.0, float(data.get("risk_score", 5.0)))),
implementation_effort=data.get("implementation_effort", "m") if data.get("implementation_effort") in ("s", "m", "l", "xl") else "m",
tags=tags[:20],
target_audience=target_audience,
)
# Store effective domain for later control_id generation
control.generation_metadata["_effective_domain"] = domain
return control
def _fallback_control(self, chunk: RAGSearchResult) -> GeneratedControl:
"""Create a minimal control when LLM parsing fails."""
@@ -1393,7 +1476,8 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
open_anchors, release_state, tags,
license_rule, source_original_text, source_citation,
customer_visible, generation_metadata,
verification_method, category, generation_strategy
verification_method, category, generation_strategy,
target_audience
) VALUES (
:framework_id, :control_id, :title, :objective, :rationale,
:scope, :requirements, :test_procedure, :evidence,
@@ -1401,7 +1485,8 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
:open_anchors, :release_state, :tags,
:license_rule, :source_original_text, :source_citation,
:customer_visible, :generation_metadata,
:verification_method, :category, :generation_strategy
:verification_method, :category, :generation_strategy,
:target_audience
)
ON CONFLICT (framework_id, control_id) DO NOTHING
RETURNING id
@@ -1430,6 +1515,7 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Objekten. Jedes Objekt hat di
"verification_method": control.verification_method,
"category": control.category,
"generation_strategy": control.generation_strategy,
"target_audience": json.dumps(control.target_audience) if control.target_audience else None,
},
)
self.db.commit()