Benchmark shows Haiku is 2.5x faster than Sonnet at 5x lower cost for this JSON structuring task. Quality is equivalent. $142 vs $705 for 75K obligations, ~2.8 days vs ~7 days. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2125 lines
80 KiB
Python
2125 lines
80 KiB
Python
"""Decomposition Pass — Split Rich Controls into Atomic Controls.
|
|
|
|
Pass 0 of the Multi-Layer Control Architecture migration. Runs BEFORE
|
|
Passes 1-5 (obligation linkage, pattern classification, etc.).
|
|
|
|
Two sub-passes:
|
|
Pass 0a: Obligation Extraction — extract individual normative obligations
|
|
from a Rich Control using LLM with strict guardrails.
|
|
Pass 0b: Atomic Control Composition — turn each obligation candidate
|
|
into a standalone atomic control record.
|
|
|
|
Plus a Quality Gate that validates extraction results.
|
|
|
|
Guardrails (the 6 rules):
|
|
1. Only normative statements (müssen, sicherzustellen, verpflichtet, ...)
|
|
2. One main verb per obligation
|
|
3. Test obligations separate from operational obligations
|
|
4. Reporting obligations separate
|
|
5. Don't split at evidence level
|
|
6. Parent link always preserved
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import uuid
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
from sqlalchemy import text
|
|
from sqlalchemy.orm import Session
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# LLM Provider Config
|
|
# ---------------------------------------------------------------------------
|
|
|
|
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
|
|
ANTHROPIC_MODEL = os.getenv("DECOMPOSITION_LLM_MODEL", "claude-haiku-4-5-20251001")
|
|
DECOMPOSITION_BATCH_SIZE = int(os.getenv("DECOMPOSITION_BATCH_SIZE", "5"))
|
|
LLM_TIMEOUT = float(os.getenv("DECOMPOSITION_LLM_TIMEOUT", "120"))
|
|
ANTHROPIC_API_URL = "https://api.anthropic.com/v1"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Normative signal detection — 3-Tier Classification
|
|
# ---------------------------------------------------------------------------
|
|
# Tier 1: Pflicht (mandatory) — strong normative signals
|
|
# Tier 2: Empfehlung (recommendation) — weaker normative signals
|
|
# Tier 3: Kann (optional/permissive) — permissive signals
|
|
# Nothing is rejected — everything is classified.
|
|
|
|
_PFLICHT_SIGNALS = [
|
|
# Deutsche modale Pflichtformulierungen
|
|
r"\bmüssen\b", r"\bmuss\b", r"\bhat\s+sicherzustellen\b",
|
|
r"\bhaben\s+sicherzustellen\b", r"\bsind\s+verpflichtet\b",
|
|
r"\bist\s+verpflichtet\b",
|
|
# "ist zu prüfen", "sind zu dokumentieren" (direkt)
|
|
r"\bist\s+zu\s+\w+en\b", r"\bsind\s+zu\s+\w+en\b",
|
|
r"\bhat\s+zu\s+\w+en\b", r"\bhaben\s+zu\s+\w+en\b",
|
|
# "ist festzustellen", "sind vorzunehmen" (Compound-Verben, eingebettetes zu)
|
|
r"\bist\s+\w+zu\w+en\b", r"\bsind\s+\w+zu\w+en\b",
|
|
# "ist zusätzlich zu prüfen", "sind regelmäßig zu überwachen" (Adverb dazwischen)
|
|
r"\bist\s+\w+\s+zu\s+\w+en\b", r"\bsind\s+\w+\s+zu\s+\w+en\b",
|
|
r"\bhat\s+\w+\s+zu\s+\w+en\b", r"\bhaben\s+\w+\s+zu\s+\w+en\b",
|
|
# Englische Pflicht-Signale
|
|
r"\bshall\b", r"\bmust\b", r"\brequired\b",
|
|
# Compound-Infinitive (Gerundivum): mitzuteilen, anzuwenden, bereitzustellen
|
|
r"\b\w+zuteilen\b", r"\b\w+zuwenden\b", r"\b\w+zustellen\b", r"\b\w+zulegen\b",
|
|
r"\b\w+zunehmen\b", r"\b\w+zuführen\b", r"\b\w+zuhalten\b", r"\b\w+zusetzen\b",
|
|
r"\b\w+zuweisen\b", r"\b\w+zuordnen\b", r"\b\w+zufügen\b", r"\b\w+zugeben\b",
|
|
# Breites Pattern: "ist ... [bis 80 Zeichen] ... zu + Infinitiv"
|
|
r"\bist\b.{1,80}\bzu\s+\w+en\b", r"\bsind\b.{1,80}\bzu\s+\w+en\b",
|
|
]
|
|
_PFLICHT_RE = re.compile("|".join(_PFLICHT_SIGNALS), re.IGNORECASE)
|
|
|
|
_EMPFEHLUNG_SIGNALS = [
|
|
# Modale Verben (schwaecher als "muss")
|
|
r"\bsoll\b", r"\bsollen\b", r"\bsollte\b", r"\bsollten\b",
|
|
r"\bgewährleisten\b", r"\bsicherstellen\b",
|
|
# Englische Empfehlungs-Signale
|
|
r"\bshould\b", r"\bensure\b", r"\brecommend\w*\b",
|
|
# Haeufige normative Infinitive (ohne Hilfsverb, als Empfehlung)
|
|
r"\bnachweisen\b", r"\beinhalten\b", r"\bunterlassen\b", r"\bwahren\b",
|
|
r"\bdokumentieren\b", r"\bimplementieren\b", r"\büberprüfen\b", r"\büberwachen\b",
|
|
# Pruefanweisungen als normative Aussage
|
|
r"\bprüfen,\s+ob\b", r"\bkontrollieren,\s+ob\b",
|
|
]
|
|
_EMPFEHLUNG_RE = re.compile("|".join(_EMPFEHLUNG_SIGNALS), re.IGNORECASE)
|
|
|
|
_KANN_SIGNALS = [
|
|
r"\bkann\b", r"\bkönnen\b", r"\bdarf\b", r"\bdürfen\b",
|
|
r"\bmay\b", r"\boptional\b",
|
|
]
|
|
_KANN_RE = re.compile("|".join(_KANN_SIGNALS), re.IGNORECASE)
|
|
|
|
# Union of all normative signals (for backward-compatible has_normative_signal flag)
|
|
_NORMATIVE_RE = re.compile(
|
|
"|".join(_PFLICHT_SIGNALS + _EMPFEHLUNG_SIGNALS + _KANN_SIGNALS),
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
_RATIONALE_SIGNALS = [
|
|
r"\bda\s+", r"\bweil\b", r"\bgrund\b", r"\berwägung",
|
|
r"\bbecause\b", r"\breason\b", r"\brationale\b",
|
|
r"\bkönnen\s+.*\s+verursachen\b", r"\bführt\s+zu\b",
|
|
]
|
|
_RATIONALE_RE = re.compile("|".join(_RATIONALE_SIGNALS), re.IGNORECASE)
|
|
|
|
_TEST_SIGNALS = [
|
|
r"\btesten\b", r"\btest\b", r"\bprüfung\b", r"\bprüfen\b",
|
|
r"\bgetestet\b", r"\bwirksamkeit\b", r"\baudit\b",
|
|
r"\bregelmäßig\b.*\b(prüf|test|kontroll)",
|
|
r"\beffectiveness\b", r"\bverif",
|
|
]
|
|
_TEST_RE = re.compile("|".join(_TEST_SIGNALS), re.IGNORECASE)
|
|
|
|
_REPORTING_SIGNALS = [
|
|
r"\bmelden\b", r"\bmeldung\b", r"\bunterricht",
|
|
r"\binformieren\b", r"\bbenachricht", r"\bnotif",
|
|
r"\breport\b", r"\bbehörd",
|
|
]
|
|
_REPORTING_RE = re.compile("|".join(_REPORTING_SIGNALS), re.IGNORECASE)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Merge & Enrichment helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Trigger-type detection patterns
|
|
_EVENT_TRIGGERS = re.compile(
|
|
r"\b(vorfall|incident|breach|verletzung|sicherheitsvorfall|meldung|entdeckung"
|
|
r"|feststellung|erkennung|ereignis|eintritt|bei\s+auftreten|im\s+falle"
|
|
r"|wenn\s+ein|sobald|unverzüglich|upon|in\s+case\s+of|when\s+a)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
_PERIODIC_TRIGGERS = re.compile(
|
|
r"\b(jährlich|monatlich|quartalsweise|regelmäßig|periodisch|annually"
|
|
r"|monthly|quarterly|periodic|mindestens\s+(einmal|alle)|turnusmäßig"
|
|
r"|wiederkehrend|in\s+regelmäßigen\s+abständen)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Implementation-specific keywords (concrete tools/protocols/formats)
|
|
_IMPL_SPECIFIC_PATTERNS = re.compile(
|
|
r"\b(TLS|SSL|AES|RSA|SHA-\d|HTTPS|LDAP|SAML|OAuth|OIDC|MFA|2FA"
|
|
r"|SIEM|IDS|IPS|WAF|VPN|VLAN|DMZ|HSM|PKI|RBAC|ABAC"
|
|
r"|ISO\s*27\d{3}|SOC\s*2|PCI[\s-]DSS|NIST"
|
|
r"|Firewall|Antivirus|EDR|XDR|SOAR|DLP"
|
|
r"|SMS|E-Mail|Fax|Telefon"
|
|
r"|JSON|XML|CSV|PDF|YAML"
|
|
r"|PostgreSQL|MySQL|MongoDB|Redis|Kafka"
|
|
r"|Docker|Kubernetes|AWS|Azure|GCP"
|
|
r"|Active\s*Directory|RADIUS|Kerberos"
|
|
r"|RSyslog|Splunk|ELK|Grafana|Prometheus"
|
|
r"|Git|Jenkins|Terraform|Ansible)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def _classify_trigger_type(obligation_text: str, condition: str) -> str:
|
|
"""Classify when an obligation is triggered: event/periodic/continuous."""
|
|
combined = f"{obligation_text} {condition}"
|
|
if _EVENT_TRIGGERS.search(combined):
|
|
return "event"
|
|
if _PERIODIC_TRIGGERS.search(combined):
|
|
return "periodic"
|
|
return "continuous"
|
|
|
|
|
|
def _is_implementation_specific_text(
|
|
obligation_text: str, action: str, obj: str
|
|
) -> bool:
|
|
"""Check if an obligation references concrete implementation details."""
|
|
combined = f"{obligation_text} {action} {obj}"
|
|
matches = _IMPL_SPECIFIC_PATTERNS.findall(combined)
|
|
return len(matches) >= 1
|
|
|
|
|
|
def _text_similar(a: str, b: str, threshold: float = 0.75) -> bool:
|
|
"""Quick token-overlap similarity check (Jaccard on words)."""
|
|
if not a or not b:
|
|
return False
|
|
tokens_a = set(a.split())
|
|
tokens_b = set(b.split())
|
|
if not tokens_a or not tokens_b:
|
|
return False
|
|
intersection = tokens_a & tokens_b
|
|
union = tokens_a | tokens_b
|
|
return len(intersection) / len(union) >= threshold
|
|
|
|
|
|
def _is_more_implementation_specific(text_a: str, text_b: str) -> bool:
|
|
"""Return True if text_a is more implementation-specific than text_b."""
|
|
matches_a = len(_IMPL_SPECIFIC_PATTERNS.findall(text_a))
|
|
matches_b = len(_IMPL_SPECIFIC_PATTERNS.findall(text_b))
|
|
if matches_a != matches_b:
|
|
return matches_a > matches_b
|
|
# Tie-break: longer text is usually more specific
|
|
return len(text_a) > len(text_b)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data classes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class ObligationCandidate:
|
|
"""A single normative obligation extracted from a Rich Control."""
|
|
|
|
candidate_id: str = ""
|
|
parent_control_uuid: str = ""
|
|
obligation_text: str = ""
|
|
action: str = ""
|
|
object_: str = ""
|
|
condition: Optional[str] = None
|
|
normative_strength: str = "must"
|
|
obligation_type: str = "pflicht" # pflicht | empfehlung | kann
|
|
is_test_obligation: bool = False
|
|
is_reporting_obligation: bool = False
|
|
extraction_confidence: float = 0.0
|
|
quality_flags: dict = field(default_factory=dict)
|
|
release_state: str = "extracted"
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"candidate_id": self.candidate_id,
|
|
"parent_control_uuid": self.parent_control_uuid,
|
|
"obligation_text": self.obligation_text,
|
|
"action": self.action,
|
|
"object": self.object_,
|
|
"condition": self.condition,
|
|
"normative_strength": self.normative_strength,
|
|
"obligation_type": self.obligation_type,
|
|
"is_test_obligation": self.is_test_obligation,
|
|
"is_reporting_obligation": self.is_reporting_obligation,
|
|
"extraction_confidence": self.extraction_confidence,
|
|
"quality_flags": self.quality_flags,
|
|
"release_state": self.release_state,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class AtomicControlCandidate:
|
|
"""An atomic control composed from a single ObligationCandidate."""
|
|
|
|
candidate_id: str = ""
|
|
parent_control_uuid: str = ""
|
|
obligation_candidate_id: str = ""
|
|
title: str = ""
|
|
objective: str = ""
|
|
requirements: list = field(default_factory=list)
|
|
test_procedure: list = field(default_factory=list)
|
|
evidence: list = field(default_factory=list)
|
|
severity: str = "medium"
|
|
category: str = ""
|
|
domain: str = ""
|
|
source_regulation: str = ""
|
|
source_article: str = ""
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"candidate_id": self.candidate_id,
|
|
"parent_control_uuid": self.parent_control_uuid,
|
|
"obligation_candidate_id": self.obligation_candidate_id,
|
|
"title": self.title,
|
|
"objective": self.objective,
|
|
"requirements": self.requirements,
|
|
"test_procedure": self.test_procedure,
|
|
"evidence": self.evidence,
|
|
"severity": self.severity,
|
|
"category": self.category,
|
|
"domain": self.domain,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Quality Gate
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def classify_obligation_type(txt: str) -> str:
|
|
"""Classify obligation text into pflicht/empfehlung/kann.
|
|
|
|
Priority: pflicht > empfehlung > kann > empfehlung (default).
|
|
Nothing is rejected — obligations without normative signal default
|
|
to 'empfehlung' (recommendation).
|
|
"""
|
|
if _PFLICHT_RE.search(txt):
|
|
return "pflicht"
|
|
if _EMPFEHLUNG_RE.search(txt):
|
|
return "empfehlung"
|
|
if _KANN_RE.search(txt):
|
|
return "kann"
|
|
# No signal at all — LLM thought it was an obligation, classify
|
|
# as recommendation (the user can still use it).
|
|
return "empfehlung"
|
|
|
|
|
|
def quality_gate(candidate: ObligationCandidate) -> dict:
|
|
"""Validate an obligation candidate. Returns quality flags dict.
|
|
|
|
Checks:
|
|
has_normative_signal: text contains normative language (informational)
|
|
obligation_type: pflicht | empfehlung | kann (classified, never rejected)
|
|
single_action: only one main action (heuristic)
|
|
not_rationale: not just a justification/reasoning
|
|
not_evidence_only: not just an evidence requirement
|
|
min_length: text is long enough to be meaningful
|
|
has_parent_link: references back to parent control
|
|
"""
|
|
txt = candidate.obligation_text
|
|
flags = {}
|
|
|
|
# 1. Normative signal (informational — no longer used for rejection)
|
|
flags["has_normative_signal"] = bool(_NORMATIVE_RE.search(txt))
|
|
|
|
# 1b. Obligation type classification
|
|
flags["obligation_type"] = classify_obligation_type(txt)
|
|
|
|
# 2. Single action heuristic — count "und" / "and" / "sowie" splits
|
|
# that connect different verbs (imperfect but useful)
|
|
multi_verb_re = re.compile(
|
|
r"\b(und|sowie|als auch)\b.*\b(müssen|sicherstellen|implementieren"
|
|
r"|dokumentieren|melden|testen|prüfen|überwachen|gewährleisten)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
flags["single_action"] = not bool(multi_verb_re.search(txt))
|
|
|
|
# 3. Not rationale
|
|
normative_count = len(_NORMATIVE_RE.findall(txt))
|
|
rationale_count = len(_RATIONALE_RE.findall(txt))
|
|
flags["not_rationale"] = normative_count >= rationale_count
|
|
|
|
# 4. Not evidence-only (evidence fragments are typically short noun phrases)
|
|
evidence_only_re = re.compile(
|
|
r"^(Nachweis|Dokumentation|Screenshot|Protokoll|Bericht|Zertifikat)",
|
|
re.IGNORECASE,
|
|
)
|
|
flags["not_evidence_only"] = not bool(evidence_only_re.match(txt.strip()))
|
|
|
|
# 5. Min length
|
|
flags["min_length"] = len(txt.strip()) >= 20
|
|
|
|
# 6. Parent link
|
|
flags["has_parent_link"] = bool(candidate.parent_control_uuid)
|
|
|
|
return flags
|
|
|
|
|
|
def passes_quality_gate(flags: dict) -> bool:
|
|
"""Check if critical quality flags pass.
|
|
|
|
Note: has_normative_signal is NO LONGER critical — obligations without
|
|
normative signal are classified as 'empfehlung' instead of being rejected.
|
|
"""
|
|
critical = ["not_evidence_only", "min_length", "has_parent_link"]
|
|
return all(flags.get(k, False) for k in critical)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# LLM Prompts
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
_PASS0A_SYSTEM_PROMPT = """\
|
|
Du bist ein Rechts-Compliance-Experte. Du zerlegst Compliance-Controls \
|
|
in einzelne atomare Pflichten.
|
|
|
|
ANALYSE-SCHRITTE (intern durchfuehren, NICHT im Output!):
|
|
1. Identifiziere den Adressaten (Wer muss handeln?)
|
|
2. Identifiziere die Handlung (Was muss getan werden?)
|
|
3. Bestimme die normative Staerke (muss/soll/kann)
|
|
4. Pruefe ob Test- oder Meldepflicht vorliegt (separat erfassen!)
|
|
5. Formuliere jede Pflicht als eigenstaendiges JSON-Objekt
|
|
|
|
REGELN (STRIKT EINHALTEN):
|
|
1. Nur normative Aussagen extrahieren — erkennbar an: müssen, haben \
|
|
sicherzustellen, sind verpflichtet, ist zu dokumentieren, ist zu melden, \
|
|
ist zu testen, shall, must, required.
|
|
2. Jede Pflicht hat genau EIN Hauptverb / eine Handlung.
|
|
3. Testpflichten SEPARAT von operativen Pflichten (is_test_obligation=true).
|
|
4. Meldepflichten SEPARAT (is_reporting_obligation=true).
|
|
5. NICHT auf Evidence-Ebene zerlegen (z.B. "DR-Plan vorhanden" ist KEIN \
|
|
eigenes Control, sondern Evidence).
|
|
6. Begründungen, Erläuterungen und Erwägungsgründe sind KEINE Pflichten \
|
|
— NICHT extrahieren.
|
|
|
|
Antworte NUR mit einem JSON-Array. Keine Erklärungen."""
|
|
|
|
|
|
def _build_pass0a_prompt(
|
|
title: str, objective: str, requirements: str,
|
|
test_procedure: str, source_ref: str
|
|
) -> str:
|
|
return f"""\
|
|
Analysiere das folgende Control und extrahiere alle einzelnen normativen \
|
|
Pflichten als JSON-Array.
|
|
|
|
CONTROL:
|
|
Titel: {title}
|
|
Ziel: {objective}
|
|
Anforderungen: {requirements}
|
|
Prüfverfahren: {test_procedure}
|
|
Quellreferenz: {source_ref}
|
|
|
|
Antworte als JSON-Array:
|
|
[
|
|
{{
|
|
"obligation_text": "Kurze, präzise Formulierung der Pflicht",
|
|
"action": "Hauptverb/Handlung",
|
|
"object": "Gegenstand der Pflicht",
|
|
"condition": "Auslöser/Bedingung oder null",
|
|
"normative_strength": "must",
|
|
"is_test_obligation": false,
|
|
"is_reporting_obligation": false
|
|
}}
|
|
]"""
|
|
|
|
|
|
_PASS0B_SYSTEM_PROMPT = """\
|
|
Du bist ein Security-Compliance-Experte. Du erstellst aus einer einzelnen \
|
|
normativen Pflicht ein praxisorientiertes, atomares Security Control.
|
|
|
|
ANALYSE-SCHRITTE (intern durchfuehren, NICHT im Output!):
|
|
1. Identifiziere die konkrete Anforderung aus der Pflicht
|
|
2. Leite eine umsetzbare technische/organisatorische Massnahme ab
|
|
3. Definiere ein Pruefverfahren (wie wird Umsetzung verifiziert?)
|
|
4. Bestimme den Nachweis (welches Dokument/Artefakt belegt Compliance?)
|
|
|
|
Das Control muss UMSETZBAR sein — keine Gesetzesparaphrase.
|
|
Antworte NUR als JSON. Keine Erklärungen."""
|
|
|
|
|
|
def _build_pass0b_prompt(
|
|
obligation_text: str, action: str, object_: str,
|
|
parent_title: str, parent_category: str, source_ref: str,
|
|
) -> str:
|
|
return f"""\
|
|
Erstelle aus der folgenden Pflicht ein atomares Control.
|
|
|
|
PFLICHT: {obligation_text}
|
|
HANDLUNG: {action}
|
|
GEGENSTAND: {object_}
|
|
|
|
KONTEXT (Ursprungs-Control):
|
|
Titel: {parent_title}
|
|
Kategorie: {parent_category}
|
|
Quellreferenz: {source_ref}
|
|
|
|
Antworte als JSON:
|
|
{{
|
|
"title": "Kurzer Titel (max 80 Zeichen, deutsch)",
|
|
"objective": "Was muss erreicht werden? (1-2 Sätze)",
|
|
"requirements": ["Konkrete Anforderung 1", "Anforderung 2"],
|
|
"test_procedure": ["Prüfschritt 1", "Prüfschritt 2"],
|
|
"evidence": ["Nachweis 1", "Nachweis 2"],
|
|
"severity": "critical|high|medium|low",
|
|
"category": "security|privacy|governance|operations|finance|reporting"
|
|
}}"""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Batch Prompts (multiple controls/obligations per API call)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _build_pass0a_batch_prompt(controls: list[dict]) -> str:
|
|
"""Build a prompt for extracting obligations from multiple controls.
|
|
|
|
Each control dict needs: control_id, title, objective, requirements,
|
|
test_procedure, source_ref.
|
|
"""
|
|
parts = []
|
|
for i, ctrl in enumerate(controls, 1):
|
|
parts.append(
|
|
f"--- CONTROL {i} (ID: {ctrl['control_id']}) ---\n"
|
|
f"Titel: {ctrl['title']}\n"
|
|
f"Ziel: {ctrl['objective']}\n"
|
|
f"Anforderungen: {ctrl['requirements']}\n"
|
|
f"Prüfverfahren: {ctrl['test_procedure']}\n"
|
|
f"Quellreferenz: {ctrl['source_ref']}"
|
|
)
|
|
|
|
controls_text = "\n\n".join(parts)
|
|
ids_example = ", ".join(f'"{c["control_id"]}": [...]' for c in controls[:2])
|
|
|
|
return f"""\
|
|
Analysiere die folgenden {len(controls)} Controls und extrahiere aus JEDEM \
|
|
alle einzelnen normativen Pflichten.
|
|
|
|
{controls_text}
|
|
|
|
Antworte als JSON-Objekt. Fuer JEDES Control ein Key (die Control-ID) mit \
|
|
einem Array von Pflichten:
|
|
{{
|
|
{ids_example}
|
|
}}
|
|
|
|
Jede Pflicht hat dieses Format:
|
|
{{
|
|
"obligation_text": "Kurze, präzise Formulierung der Pflicht",
|
|
"action": "Hauptverb/Handlung",
|
|
"object": "Gegenstand der Pflicht",
|
|
"condition": null,
|
|
"normative_strength": "must",
|
|
"is_test_obligation": false,
|
|
"is_reporting_obligation": false
|
|
}}"""
|
|
|
|
|
|
def _build_pass0b_batch_prompt(obligations: list[dict]) -> str:
|
|
"""Build a prompt for composing multiple atomic controls.
|
|
|
|
Each obligation dict needs: candidate_id, obligation_text, action,
|
|
object, parent_title, parent_category, source_ref.
|
|
"""
|
|
parts = []
|
|
for i, obl in enumerate(obligations, 1):
|
|
parts.append(
|
|
f"--- PFLICHT {i} (ID: {obl['candidate_id']}) ---\n"
|
|
f"PFLICHT: {obl['obligation_text']}\n"
|
|
f"HANDLUNG: {obl['action']}\n"
|
|
f"GEGENSTAND: {obl['object']}\n"
|
|
f"KONTEXT: {obl['parent_title']} | {obl['parent_category']}\n"
|
|
f"Quellreferenz: {obl['source_ref']}"
|
|
)
|
|
|
|
obligations_text = "\n\n".join(parts)
|
|
ids_example = ", ".join(f'"{o["candidate_id"]}": {{...}}' for o in obligations[:2])
|
|
|
|
return f"""\
|
|
Erstelle aus den folgenden {len(obligations)} Pflichten je ein atomares Control.
|
|
|
|
{obligations_text}
|
|
|
|
Antworte als JSON-Objekt. Fuer JEDE Pflicht ein Key (die Pflicht-ID):
|
|
{{
|
|
{ids_example}
|
|
}}
|
|
|
|
Jedes Control hat dieses Format:
|
|
{{
|
|
"title": "Kurzer Titel (max 80 Zeichen, deutsch)",
|
|
"objective": "Was muss erreicht werden? (1-2 Sätze)",
|
|
"requirements": ["Konkrete Anforderung 1", "Anforderung 2"],
|
|
"test_procedure": ["Prüfschritt 1", "Prüfschritt 2"],
|
|
"evidence": ["Nachweis 1", "Nachweis 2"],
|
|
"severity": "critical|high|medium|low",
|
|
"category": "security|privacy|governance|operations|finance|reporting"
|
|
}}"""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Anthropic API (with prompt caching)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def _llm_anthropic(
|
|
prompt: str,
|
|
system_prompt: str,
|
|
max_tokens: int = 8192,
|
|
) -> str:
|
|
"""Call Anthropic Messages API with prompt caching for system prompt."""
|
|
if not ANTHROPIC_API_KEY:
|
|
raise RuntimeError("ANTHROPIC_API_KEY not set")
|
|
|
|
headers = {
|
|
"x-api-key": ANTHROPIC_API_KEY,
|
|
"anthropic-version": "2023-06-01",
|
|
"content-type": "application/json",
|
|
}
|
|
payload = {
|
|
"model": ANTHROPIC_MODEL,
|
|
"max_tokens": max_tokens,
|
|
"system": [
|
|
{
|
|
"type": "text",
|
|
"text": system_prompt,
|
|
"cache_control": {"type": "ephemeral"},
|
|
}
|
|
],
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
}
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=LLM_TIMEOUT) as client:
|
|
resp = await client.post(
|
|
f"{ANTHROPIC_API_URL}/messages",
|
|
headers=headers,
|
|
json=payload,
|
|
)
|
|
if resp.status_code != 200:
|
|
logger.error(
|
|
"Anthropic API %d: %s", resp.status_code, resp.text[:300]
|
|
)
|
|
return ""
|
|
data = resp.json()
|
|
# Log cache performance
|
|
usage = data.get("usage", {})
|
|
cached = usage.get("cache_read_input_tokens", 0)
|
|
if cached > 0:
|
|
logger.debug(
|
|
"Prompt cache hit: %d cached tokens", cached
|
|
)
|
|
content = data.get("content", [])
|
|
if content and isinstance(content, list):
|
|
return content[0].get("text", "")
|
|
return ""
|
|
except Exception as e:
|
|
logger.error("Anthropic request failed: %s", e)
|
|
return ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Anthropic Batch API (50% cost reduction, async processing)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def create_anthropic_batch(
|
|
requests: list[dict],
|
|
) -> dict:
|
|
"""Submit a batch of requests to Anthropic Batch API.
|
|
|
|
Each request: {"custom_id": "...", "params": {model, max_tokens, system, messages}}
|
|
Returns batch metadata including batch_id.
|
|
"""
|
|
if not ANTHROPIC_API_KEY:
|
|
raise RuntimeError("ANTHROPIC_API_KEY not set")
|
|
|
|
headers = {
|
|
"x-api-key": ANTHROPIC_API_KEY,
|
|
"anthropic-version": "2023-06-01",
|
|
"content-type": "application/json",
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=60) as client:
|
|
resp = await client.post(
|
|
f"{ANTHROPIC_API_URL}/messages/batches",
|
|
headers=headers,
|
|
json={"requests": requests},
|
|
)
|
|
if resp.status_code not in (200, 201):
|
|
raise RuntimeError(
|
|
f"Batch API failed {resp.status_code}: {resp.text[:500]}"
|
|
)
|
|
return resp.json()
|
|
|
|
|
|
async def check_batch_status(batch_id: str) -> dict:
|
|
"""Check the processing status of a batch."""
|
|
headers = {
|
|
"x-api-key": ANTHROPIC_API_KEY,
|
|
"anthropic-version": "2023-06-01",
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
resp = await client.get(
|
|
f"{ANTHROPIC_API_URL}/messages/batches/{batch_id}",
|
|
headers=headers,
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
async def fetch_batch_results(batch_id: str) -> list[dict]:
|
|
"""Fetch results of a completed batch. Returns list of result objects."""
|
|
headers = {
|
|
"x-api-key": ANTHROPIC_API_KEY,
|
|
"anthropic-version": "2023-06-01",
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=120) as client:
|
|
resp = await client.get(
|
|
f"{ANTHROPIC_API_URL}/messages/batches/{batch_id}/results",
|
|
headers=headers,
|
|
)
|
|
resp.raise_for_status()
|
|
# Response is JSONL (one JSON object per line)
|
|
results = []
|
|
for line in resp.text.strip().split("\n"):
|
|
if line.strip():
|
|
results.append(json.loads(line))
|
|
return results
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Parse helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _parse_json_array(text: str) -> list[dict]:
|
|
"""Extract a JSON array from LLM response text."""
|
|
# Try direct parse
|
|
try:
|
|
result = json.loads(text)
|
|
if isinstance(result, list):
|
|
return result
|
|
if isinstance(result, dict):
|
|
return [result]
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Try extracting JSON array block
|
|
match = re.search(r"\[[\s\S]*\]", text)
|
|
if match:
|
|
try:
|
|
result = json.loads(match.group())
|
|
if isinstance(result, list):
|
|
return result
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
return []
|
|
|
|
|
|
def _parse_json_object(text: str) -> dict:
|
|
"""Extract a JSON object from LLM response text."""
|
|
try:
|
|
result = json.loads(text)
|
|
if isinstance(result, dict):
|
|
return result
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
match = re.search(r"\{[\s\S]*\}", text)
|
|
if match:
|
|
try:
|
|
result = json.loads(match.group())
|
|
if isinstance(result, dict):
|
|
return result
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
return {}
|
|
|
|
|
|
def _ensure_list(val) -> list:
|
|
"""Ensure value is a list."""
|
|
if isinstance(val, list):
|
|
return val
|
|
if isinstance(val, str):
|
|
return [val] if val else []
|
|
return []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Decomposition Pass
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class DecompositionPass:
|
|
"""Pass 0: Decompose Rich Controls into atomic candidates.
|
|
|
|
Usage::
|
|
|
|
decomp = DecompositionPass(db=session)
|
|
stats_0a = await decomp.run_pass0a(limit=100)
|
|
stats_0b = await decomp.run_pass0b(limit=100)
|
|
"""
|
|
|
|
def __init__(self, db: Session, dedup_enabled: bool = False):
|
|
self.db = db
|
|
self._dedup = None
|
|
if dedup_enabled:
|
|
from compliance.services.control_dedup import (
|
|
ControlDedupChecker, DEDUP_ENABLED,
|
|
)
|
|
if DEDUP_ENABLED:
|
|
self._dedup = ControlDedupChecker(db)
|
|
|
|
# -------------------------------------------------------------------
|
|
# Pass 0a: Obligation Extraction
|
|
# -------------------------------------------------------------------
|
|
|
|
async def run_pass0a(
|
|
self,
|
|
limit: int = 0,
|
|
batch_size: int = 0,
|
|
use_anthropic: bool = False,
|
|
category_filter: Optional[str] = None,
|
|
source_filter: Optional[str] = None,
|
|
) -> dict:
|
|
"""Extract obligation candidates from rich controls.
|
|
|
|
Args:
|
|
limit: Max controls to process (0 = no limit).
|
|
batch_size: Controls per LLM call (0 = use DECOMPOSITION_BATCH_SIZE
|
|
env var, or 1 for single mode). Only >1 with Anthropic.
|
|
use_anthropic: Use Anthropic API (True) or Ollama (False).
|
|
category_filter: Only process controls matching this category
|
|
(comma-separated, e.g. "security,privacy").
|
|
source_filter: Only process controls from these source regulations
|
|
(comma-separated, e.g. "Maschinenverordnung,Cyber Resilience Act").
|
|
Matches against source_citation->>'source' using ILIKE.
|
|
"""
|
|
if batch_size <= 0:
|
|
batch_size = DECOMPOSITION_BATCH_SIZE if use_anthropic else 1
|
|
|
|
# Find rich controls not yet decomposed
|
|
query = """
|
|
SELECT cc.id, cc.control_id, cc.title, cc.objective,
|
|
cc.requirements, cc.test_procedure,
|
|
cc.source_citation, cc.category
|
|
FROM canonical_controls cc
|
|
WHERE cc.release_state NOT IN ('deprecated')
|
|
AND cc.parent_control_uuid IS NULL
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM obligation_candidates oc
|
|
WHERE oc.parent_control_uuid = cc.id
|
|
)
|
|
"""
|
|
params = {}
|
|
if category_filter:
|
|
cats = [c.strip() for c in category_filter.split(",") if c.strip()]
|
|
if cats:
|
|
query += " AND cc.category IN :cats"
|
|
params["cats"] = tuple(cats)
|
|
|
|
if source_filter:
|
|
sources = [s.strip() for s in source_filter.split(",") if s.strip()]
|
|
if sources:
|
|
clauses = []
|
|
for idx, src in enumerate(sources):
|
|
key = f"src_{idx}"
|
|
clauses.append(f"cc.source_citation::text ILIKE :{key}")
|
|
params[key] = f"%{src}%"
|
|
query += " AND (" + " OR ".join(clauses) + ")"
|
|
|
|
query += " ORDER BY cc.created_at"
|
|
if limit > 0:
|
|
query += f" LIMIT {limit}"
|
|
|
|
rows = self.db.execute(text(query), params).fetchall()
|
|
|
|
stats = {
|
|
"controls_processed": 0,
|
|
"obligations_extracted": 0,
|
|
"obligations_validated": 0,
|
|
"obligations_rejected": 0,
|
|
"controls_skipped_empty": 0,
|
|
"llm_calls": 0,
|
|
"errors": 0,
|
|
"provider": "anthropic" if use_anthropic else "ollama",
|
|
"batch_size": batch_size,
|
|
}
|
|
|
|
# Prepare control data
|
|
prepared = []
|
|
for row in rows:
|
|
title = row[2] or ""
|
|
objective = row[3] or ""
|
|
req_str = _format_field(row[4] or "")
|
|
test_str = _format_field(row[5] or "")
|
|
source_str = _format_citation(row[6] or "")
|
|
|
|
if not title and not objective and not req_str:
|
|
stats["controls_skipped_empty"] += 1
|
|
continue
|
|
|
|
prepared.append({
|
|
"uuid": str(row[0]),
|
|
"control_id": row[1] or "",
|
|
"title": title,
|
|
"objective": objective,
|
|
"requirements": req_str,
|
|
"test_procedure": test_str,
|
|
"source_ref": source_str,
|
|
"category": row[7] or "",
|
|
})
|
|
|
|
# Process in batches
|
|
for i in range(0, len(prepared), batch_size):
|
|
batch = prepared[i : i + batch_size]
|
|
try:
|
|
if use_anthropic and len(batch) > 1:
|
|
# Batched Anthropic call
|
|
prompt = _build_pass0a_batch_prompt(batch)
|
|
llm_response = await _llm_anthropic(
|
|
prompt=prompt,
|
|
system_prompt=_PASS0A_SYSTEM_PROMPT,
|
|
max_tokens=max(8192, len(batch) * 2000),
|
|
)
|
|
stats["llm_calls"] += 1
|
|
results_by_id = _parse_json_object(llm_response)
|
|
for ctrl in batch:
|
|
raw_obls = results_by_id.get(ctrl["control_id"], [])
|
|
if not isinstance(raw_obls, list):
|
|
raw_obls = [raw_obls] if raw_obls else []
|
|
if not raw_obls:
|
|
raw_obls = [_fallback_obligation(ctrl)]
|
|
self._process_pass0a_obligations(
|
|
raw_obls, ctrl["control_id"], ctrl["uuid"], stats
|
|
)
|
|
stats["controls_processed"] += 1
|
|
elif use_anthropic:
|
|
# Single Anthropic call
|
|
ctrl = batch[0]
|
|
prompt = _build_pass0a_prompt(
|
|
title=ctrl["title"], objective=ctrl["objective"],
|
|
requirements=ctrl["requirements"],
|
|
test_procedure=ctrl["test_procedure"],
|
|
source_ref=ctrl["source_ref"],
|
|
)
|
|
llm_response = await _llm_anthropic(
|
|
prompt=prompt,
|
|
system_prompt=_PASS0A_SYSTEM_PROMPT,
|
|
)
|
|
stats["llm_calls"] += 1
|
|
raw_obls = _parse_json_array(llm_response)
|
|
if not raw_obls:
|
|
raw_obls = [_fallback_obligation(ctrl)]
|
|
self._process_pass0a_obligations(
|
|
raw_obls, ctrl["control_id"], ctrl["uuid"], stats
|
|
)
|
|
stats["controls_processed"] += 1
|
|
else:
|
|
# Ollama (single only)
|
|
from compliance.services.obligation_extractor import _llm_ollama
|
|
ctrl = batch[0]
|
|
prompt = _build_pass0a_prompt(
|
|
title=ctrl["title"], objective=ctrl["objective"],
|
|
requirements=ctrl["requirements"],
|
|
test_procedure=ctrl["test_procedure"],
|
|
source_ref=ctrl["source_ref"],
|
|
)
|
|
llm_response = await _llm_ollama(
|
|
prompt=prompt,
|
|
system_prompt=_PASS0A_SYSTEM_PROMPT,
|
|
)
|
|
stats["llm_calls"] += 1
|
|
raw_obls = _parse_json_array(llm_response)
|
|
if not raw_obls:
|
|
raw_obls = [_fallback_obligation(ctrl)]
|
|
self._process_pass0a_obligations(
|
|
raw_obls, ctrl["control_id"], ctrl["uuid"], stats
|
|
)
|
|
stats["controls_processed"] += 1
|
|
|
|
# Commit after each successful sub-batch to avoid losing work
|
|
self.db.commit()
|
|
|
|
except Exception as e:
|
|
ids = ", ".join(c["control_id"] for c in batch)
|
|
logger.error("Pass 0a failed for [%s]: %s", ids, e)
|
|
stats["errors"] += 1
|
|
try:
|
|
self.db.rollback()
|
|
except Exception:
|
|
pass
|
|
logger.info("Pass 0a: %s", stats)
|
|
return stats
|
|
|
|
_NORMATIVE_STRENGTH_MAP = {
|
|
"muss": "must", "must": "must",
|
|
"soll": "should", "should": "should",
|
|
"kann": "may", "may": "may",
|
|
}
|
|
|
|
def _process_pass0a_obligations(
|
|
self,
|
|
raw_obligations: list[dict],
|
|
control_id: str,
|
|
control_uuid: str,
|
|
stats: dict,
|
|
) -> None:
|
|
"""Validate and write obligation candidates from LLM output."""
|
|
for idx, raw in enumerate(raw_obligations):
|
|
raw_strength = raw.get("normative_strength", "must").lower().strip()
|
|
normative_strength = self._NORMATIVE_STRENGTH_MAP.get(
|
|
raw_strength, "must"
|
|
)
|
|
cand = ObligationCandidate(
|
|
candidate_id=f"OC-{control_id}-{idx + 1:02d}",
|
|
parent_control_uuid=control_uuid,
|
|
obligation_text=raw.get("obligation_text", ""),
|
|
action=raw.get("action", ""),
|
|
object_=raw.get("object", ""),
|
|
condition=raw.get("condition"),
|
|
normative_strength=normative_strength,
|
|
is_test_obligation=bool(raw.get("is_test_obligation", False)),
|
|
is_reporting_obligation=bool(raw.get("is_reporting_obligation", False)),
|
|
)
|
|
|
|
# Auto-detect test/reporting if LLM missed it
|
|
if not cand.is_test_obligation and _TEST_RE.search(cand.obligation_text):
|
|
cand.is_test_obligation = True
|
|
if not cand.is_reporting_obligation and _REPORTING_RE.search(cand.obligation_text):
|
|
cand.is_reporting_obligation = True
|
|
|
|
# Quality gate + obligation type classification
|
|
flags = quality_gate(cand)
|
|
cand.quality_flags = flags
|
|
cand.extraction_confidence = _compute_extraction_confidence(flags)
|
|
cand.obligation_type = flags.get("obligation_type", "empfehlung")
|
|
|
|
if passes_quality_gate(flags):
|
|
cand.release_state = "validated"
|
|
stats["obligations_validated"] += 1
|
|
else:
|
|
cand.release_state = "rejected"
|
|
stats["obligations_rejected"] += 1
|
|
|
|
self._write_obligation_candidate(cand)
|
|
stats["obligations_extracted"] += 1
|
|
|
|
# -------------------------------------------------------------------
|
|
# Pass 0b: Atomic Control Composition
|
|
# -------------------------------------------------------------------
|
|
|
|
async def run_pass0b(
|
|
self,
|
|
limit: int = 0,
|
|
batch_size: int = 0,
|
|
use_anthropic: bool = False,
|
|
) -> dict:
|
|
"""Compose atomic controls from validated obligation candidates.
|
|
|
|
Args:
|
|
limit: Max candidates to process (0 = no limit).
|
|
batch_size: Obligations per LLM call (0 = auto).
|
|
use_anthropic: Use Anthropic API (True) or Ollama (False).
|
|
"""
|
|
if batch_size <= 0:
|
|
batch_size = DECOMPOSITION_BATCH_SIZE if use_anthropic else 1
|
|
|
|
query = """
|
|
SELECT oc.id, oc.candidate_id, oc.parent_control_uuid,
|
|
oc.obligation_text, oc.action, oc.object,
|
|
oc.is_test_obligation, oc.is_reporting_obligation,
|
|
cc.title AS parent_title,
|
|
cc.category AS parent_category,
|
|
cc.source_citation AS parent_citation,
|
|
cc.severity AS parent_severity,
|
|
cc.control_id AS parent_control_id,
|
|
oc.trigger_type,
|
|
oc.is_implementation_specific
|
|
FROM obligation_candidates oc
|
|
JOIN canonical_controls cc ON cc.id = oc.parent_control_uuid
|
|
WHERE oc.release_state = 'validated'
|
|
AND oc.merged_into_id IS NULL
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM canonical_controls ac
|
|
WHERE ac.parent_control_uuid = oc.parent_control_uuid
|
|
AND ac.decomposition_method = 'pass0b'
|
|
AND ac.title LIKE '%' || LEFT(oc.action, 20) || '%'
|
|
)
|
|
"""
|
|
if limit > 0:
|
|
query += f" LIMIT {limit}"
|
|
|
|
rows = self.db.execute(text(query)).fetchall()
|
|
|
|
stats = {
|
|
"candidates_processed": 0,
|
|
"controls_created": 0,
|
|
"llm_failures": 0,
|
|
"llm_calls": 0,
|
|
"errors": 0,
|
|
"provider": "anthropic" if use_anthropic else "ollama",
|
|
"batch_size": batch_size,
|
|
"dedup_enabled": self._dedup is not None,
|
|
"dedup_linked": 0,
|
|
"dedup_review": 0,
|
|
"skipped_merged": 0,
|
|
}
|
|
|
|
# Prepare obligation data
|
|
prepared = []
|
|
for row in rows:
|
|
prepared.append({
|
|
"oc_id": str(row[0]),
|
|
"candidate_id": row[1] or "",
|
|
"parent_uuid": str(row[2]),
|
|
"obligation_text": row[3] or "",
|
|
"action": row[4] or "",
|
|
"object": row[5] or "",
|
|
"is_test": row[6],
|
|
"is_reporting": row[7],
|
|
"parent_title": row[8] or "",
|
|
"parent_category": row[9] or "",
|
|
"parent_citation": row[10] or "",
|
|
"parent_severity": row[11] or "medium",
|
|
"parent_control_id": row[12] or "",
|
|
"source_ref": _format_citation(row[10] or ""),
|
|
"trigger_type": row[13] or "continuous",
|
|
"is_implementation_specific": row[14] or False,
|
|
})
|
|
|
|
# Process in batches
|
|
for i in range(0, len(prepared), batch_size):
|
|
batch = prepared[i : i + batch_size]
|
|
try:
|
|
if use_anthropic and len(batch) > 1:
|
|
# Batched Anthropic call
|
|
prompt = _build_pass0b_batch_prompt(batch)
|
|
llm_response = await _llm_anthropic(
|
|
prompt=prompt,
|
|
system_prompt=_PASS0B_SYSTEM_PROMPT,
|
|
max_tokens=min(16384, max(4096, len(batch) * 500)),
|
|
)
|
|
stats["llm_calls"] += 1
|
|
results_by_id = _parse_json_object(llm_response)
|
|
for obl in batch:
|
|
parsed = results_by_id.get(obl["candidate_id"], {})
|
|
await self._process_pass0b_control(obl, parsed, stats)
|
|
elif use_anthropic:
|
|
obl = batch[0]
|
|
prompt = _build_pass0b_prompt(
|
|
obligation_text=obl["obligation_text"],
|
|
action=obl["action"], object_=obl["object"],
|
|
parent_title=obl["parent_title"],
|
|
parent_category=obl["parent_category"],
|
|
source_ref=obl["source_ref"],
|
|
)
|
|
llm_response = await _llm_anthropic(
|
|
prompt=prompt,
|
|
system_prompt=_PASS0B_SYSTEM_PROMPT,
|
|
)
|
|
stats["llm_calls"] += 1
|
|
parsed = _parse_json_object(llm_response)
|
|
await self._process_pass0b_control(obl, parsed, stats)
|
|
else:
|
|
from compliance.services.obligation_extractor import _llm_ollama
|
|
obl = batch[0]
|
|
prompt = _build_pass0b_prompt(
|
|
obligation_text=obl["obligation_text"],
|
|
action=obl["action"], object_=obl["object"],
|
|
parent_title=obl["parent_title"],
|
|
parent_category=obl["parent_category"],
|
|
source_ref=obl["source_ref"],
|
|
)
|
|
llm_response = await _llm_ollama(
|
|
prompt=prompt,
|
|
system_prompt=_PASS0B_SYSTEM_PROMPT,
|
|
)
|
|
stats["llm_calls"] += 1
|
|
parsed = _parse_json_object(llm_response)
|
|
await self._process_pass0b_control(obl, parsed, stats)
|
|
|
|
# Commit after each successful sub-batch
|
|
self.db.commit()
|
|
|
|
except Exception as e:
|
|
ids = ", ".join(o["candidate_id"] for o in batch)
|
|
logger.error("Pass 0b failed for [%s]: %s", ids, e)
|
|
stats["errors"] += 1
|
|
try:
|
|
self.db.rollback()
|
|
except Exception:
|
|
pass
|
|
logger.info("Pass 0b: %s", stats)
|
|
return stats
|
|
|
|
async def _process_pass0b_control(
|
|
self, obl: dict, parsed: dict, stats: dict,
|
|
) -> None:
|
|
"""Create atomic control from parsed LLM output or template fallback.
|
|
|
|
If dedup is enabled, checks for duplicates before insertion:
|
|
- LINK: adds parent link to existing control instead of creating new
|
|
- REVIEW: queues for human review, does not create control
|
|
- NEW: creates new control and indexes in Qdrant
|
|
"""
|
|
if not parsed or not parsed.get("title"):
|
|
atomic = _template_fallback(
|
|
obligation_text=obl["obligation_text"],
|
|
action=obl["action"], object_=obl["object"],
|
|
parent_title=obl["parent_title"],
|
|
parent_severity=obl["parent_severity"],
|
|
parent_category=obl["parent_category"],
|
|
is_test=obl["is_test"],
|
|
is_reporting=obl["is_reporting"],
|
|
)
|
|
stats["llm_failures"] += 1
|
|
else:
|
|
atomic = AtomicControlCandidate(
|
|
title=parsed.get("title", "")[:200],
|
|
objective=parsed.get("objective", "")[:2000],
|
|
requirements=_ensure_list(parsed.get("requirements", [])),
|
|
test_procedure=_ensure_list(parsed.get("test_procedure", [])),
|
|
evidence=_ensure_list(parsed.get("evidence", [])),
|
|
severity=_normalize_severity(
|
|
parsed.get("severity", obl["parent_severity"])
|
|
),
|
|
category=parsed.get("category", obl["parent_category"]),
|
|
)
|
|
|
|
atomic.parent_control_uuid = obl["parent_uuid"]
|
|
atomic.obligation_candidate_id = obl["candidate_id"]
|
|
|
|
# Cap severity for implementation-specific obligations
|
|
if obl.get("is_implementation_specific") and atomic.severity in (
|
|
"critical", "high"
|
|
):
|
|
atomic.severity = "medium"
|
|
|
|
# Override category for test obligations
|
|
if obl.get("is_test"):
|
|
atomic.category = "testing"
|
|
|
|
# ── Dedup check (if enabled) ────────────────────────────
|
|
if self._dedup:
|
|
pattern_id = None
|
|
# Try to get pattern_id from parent control
|
|
pid_row = self.db.execute(text(
|
|
"SELECT pattern_id FROM canonical_controls WHERE id = CAST(:uid AS uuid)"
|
|
), {"uid": obl["parent_uuid"]}).fetchone()
|
|
if pid_row:
|
|
pattern_id = pid_row[0]
|
|
|
|
result = await self._dedup.check_duplicate(
|
|
action=obl.get("action", ""),
|
|
obj=obl.get("object", ""),
|
|
title=atomic.title,
|
|
pattern_id=pattern_id,
|
|
)
|
|
|
|
if result.verdict == "link":
|
|
self._dedup.add_parent_link(
|
|
control_uuid=result.matched_control_uuid,
|
|
parent_control_uuid=obl["parent_uuid"],
|
|
link_type="dedup_merge",
|
|
confidence=result.similarity_score,
|
|
)
|
|
stats.setdefault("dedup_linked", 0)
|
|
stats["dedup_linked"] += 1
|
|
stats["candidates_processed"] += 1
|
|
logger.info("Dedup LINK: %s → %s (%.3f, %s)",
|
|
atomic.title[:60], result.matched_control_id,
|
|
result.similarity_score, result.stage)
|
|
return
|
|
|
|
if result.verdict == "review":
|
|
self._dedup.write_review(
|
|
candidate_control_id=atomic.candidate_id or "",
|
|
candidate_title=atomic.title,
|
|
candidate_objective=atomic.objective,
|
|
result=result,
|
|
parent_control_uuid=obl["parent_uuid"],
|
|
obligation_candidate_id=obl.get("oc_id"),
|
|
)
|
|
stats.setdefault("dedup_review", 0)
|
|
stats["dedup_review"] += 1
|
|
stats["candidates_processed"] += 1
|
|
logger.info("Dedup REVIEW: %s ↔ %s (%.3f, %s)",
|
|
atomic.title[:60], result.matched_control_id,
|
|
result.similarity_score, result.stage)
|
|
return
|
|
|
|
# ── Create new atomic control ───────────────────────────
|
|
seq = self._next_atomic_seq(obl["parent_control_id"])
|
|
atomic.candidate_id = f"{obl['parent_control_id']}-A{seq:02d}"
|
|
|
|
new_uuid = self._write_atomic_control(atomic, obl)
|
|
|
|
self.db.execute(
|
|
text("""
|
|
UPDATE obligation_candidates
|
|
SET release_state = 'composed'
|
|
WHERE id = CAST(:oc_id AS uuid)
|
|
"""),
|
|
{"oc_id": obl["oc_id"]},
|
|
)
|
|
|
|
# Index in Qdrant for future dedup checks
|
|
if self._dedup and new_uuid:
|
|
pattern_id_val = None
|
|
pid_row2 = self.db.execute(text(
|
|
"SELECT pattern_id FROM canonical_controls WHERE id = CAST(:uid AS uuid)"
|
|
), {"uid": obl["parent_uuid"]}).fetchone()
|
|
if pid_row2:
|
|
pattern_id_val = pid_row2[0]
|
|
|
|
if pattern_id_val:
|
|
await self._dedup.index_control(
|
|
control_uuid=new_uuid,
|
|
control_id=atomic.candidate_id,
|
|
title=atomic.title,
|
|
action=obl.get("action", ""),
|
|
obj=obl.get("object", ""),
|
|
pattern_id=pattern_id_val,
|
|
)
|
|
|
|
stats["controls_created"] += 1
|
|
stats["candidates_processed"] += 1
|
|
|
|
# -------------------------------------------------------------------
|
|
# Merge Pass: Deduplicate implementation-level obligations
|
|
# -------------------------------------------------------------------
|
|
|
|
def run_merge_pass(self) -> dict:
|
|
"""Merge implementation-level duplicate obligations within each parent.
|
|
|
|
When the same parent control has multiple obligations with nearly
|
|
identical action+object (e.g. "SMS-Verbot" + "Policy-as-Code" both
|
|
implementing a communication restriction), keep the more abstract one
|
|
and mark the concrete one as merged.
|
|
|
|
No LLM calls — purely rule-based using text similarity.
|
|
"""
|
|
stats = {
|
|
"parents_checked": 0,
|
|
"obligations_merged": 0,
|
|
"obligations_kept": 0,
|
|
}
|
|
|
|
# Get all parents that have >1 validated obligation
|
|
parents = self.db.execute(text("""
|
|
SELECT parent_control_uuid, count(*) AS cnt
|
|
FROM obligation_candidates
|
|
WHERE release_state = 'validated'
|
|
AND merged_into_id IS NULL
|
|
GROUP BY parent_control_uuid
|
|
HAVING count(*) > 1
|
|
""")).fetchall()
|
|
|
|
for parent_uuid, cnt in parents:
|
|
stats["parents_checked"] += 1
|
|
obligs = self.db.execute(text("""
|
|
SELECT id, candidate_id, obligation_text, action, object
|
|
FROM obligation_candidates
|
|
WHERE parent_control_uuid = CAST(:pid AS uuid)
|
|
AND release_state = 'validated'
|
|
AND merged_into_id IS NULL
|
|
ORDER BY created_at
|
|
"""), {"pid": str(parent_uuid)}).fetchall()
|
|
|
|
merged_ids = set()
|
|
oblig_list = list(obligs)
|
|
|
|
for i in range(len(oblig_list)):
|
|
if str(oblig_list[i][0]) in merged_ids:
|
|
continue
|
|
for j in range(i + 1, len(oblig_list)):
|
|
if str(oblig_list[j][0]) in merged_ids:
|
|
continue
|
|
|
|
action_i = (oblig_list[i][3] or "").lower().strip()
|
|
action_j = (oblig_list[j][3] or "").lower().strip()
|
|
obj_i = (oblig_list[i][4] or "").lower().strip()
|
|
obj_j = (oblig_list[j][4] or "").lower().strip()
|
|
|
|
# Check if actions are similar enough to be duplicates
|
|
if not _text_similar(action_i, action_j, threshold=0.75):
|
|
continue
|
|
if not _text_similar(obj_i, obj_j, threshold=0.60):
|
|
continue
|
|
|
|
# Keep the more abstract one (shorter text = less specific)
|
|
text_i = oblig_list[i][2] or ""
|
|
text_j = oblig_list[j][2] or ""
|
|
if _is_more_implementation_specific(text_j, text_i):
|
|
survivor_id = str(oblig_list[i][0])
|
|
merged_id = str(oblig_list[j][0])
|
|
else:
|
|
survivor_id = str(oblig_list[j][0])
|
|
merged_id = str(oblig_list[i][0])
|
|
|
|
self.db.execute(text("""
|
|
UPDATE obligation_candidates
|
|
SET release_state = 'merged',
|
|
merged_into_id = CAST(:survivor AS uuid)
|
|
WHERE id = CAST(:merged AS uuid)
|
|
"""), {"survivor": survivor_id, "merged": merged_id})
|
|
|
|
merged_ids.add(merged_id)
|
|
stats["obligations_merged"] += 1
|
|
|
|
# Commit per parent to avoid large transactions
|
|
self.db.commit()
|
|
|
|
stats["obligations_kept"] = self.db.execute(text("""
|
|
SELECT count(*) FROM obligation_candidates
|
|
WHERE release_state = 'validated' AND merged_into_id IS NULL
|
|
""")).fetchone()[0]
|
|
|
|
logger.info("Merge pass: %s", stats)
|
|
return stats
|
|
|
|
# -------------------------------------------------------------------
|
|
# Enrich Pass: Add metadata to obligations
|
|
# -------------------------------------------------------------------
|
|
|
|
def enrich_obligations(self) -> dict:
|
|
"""Add trigger_type and is_implementation_specific to obligations.
|
|
|
|
Rule-based enrichment — no LLM calls.
|
|
"""
|
|
stats = {
|
|
"enriched": 0,
|
|
"trigger_event": 0,
|
|
"trigger_periodic": 0,
|
|
"trigger_continuous": 0,
|
|
"implementation_specific": 0,
|
|
}
|
|
|
|
obligs = self.db.execute(text("""
|
|
SELECT id, obligation_text, condition, action, object
|
|
FROM obligation_candidates
|
|
WHERE release_state = 'validated'
|
|
AND merged_into_id IS NULL
|
|
AND trigger_type IS NULL
|
|
""")).fetchall()
|
|
|
|
for row in obligs:
|
|
oc_id = str(row[0])
|
|
obl_text = row[1] or ""
|
|
condition = row[2] or ""
|
|
action = row[3] or ""
|
|
obj = row[4] or ""
|
|
|
|
trigger = _classify_trigger_type(obl_text, condition)
|
|
impl = _is_implementation_specific_text(obl_text, action, obj)
|
|
|
|
self.db.execute(text("""
|
|
UPDATE obligation_candidates
|
|
SET trigger_type = :trigger,
|
|
is_implementation_specific = :impl
|
|
WHERE id = CAST(:oid AS uuid)
|
|
"""), {"trigger": trigger, "impl": impl, "oid": oc_id})
|
|
|
|
stats["enriched"] += 1
|
|
stats[f"trigger_{trigger}"] += 1
|
|
if impl:
|
|
stats["implementation_specific"] += 1
|
|
|
|
self.db.commit()
|
|
logger.info("Enrich pass: %s", stats)
|
|
return stats
|
|
|
|
# -------------------------------------------------------------------
|
|
# Decomposition Status
|
|
# -------------------------------------------------------------------
|
|
|
|
def decomposition_status(self) -> dict:
|
|
"""Return decomposition progress."""
|
|
row = self.db.execute(text("""
|
|
SELECT
|
|
(SELECT count(*) FROM canonical_controls
|
|
WHERE parent_control_uuid IS NULL
|
|
AND release_state NOT IN ('deprecated')) AS rich_controls,
|
|
(SELECT count(DISTINCT parent_control_uuid) FROM obligation_candidates) AS decomposed_controls,
|
|
(SELECT count(*) FROM obligation_candidates) AS total_candidates,
|
|
(SELECT count(*) FROM obligation_candidates WHERE release_state = 'validated') AS validated,
|
|
(SELECT count(*) FROM obligation_candidates WHERE release_state = 'rejected') AS rejected,
|
|
(SELECT count(*) FROM obligation_candidates WHERE release_state = 'composed') AS composed,
|
|
(SELECT count(*) FROM canonical_controls WHERE parent_control_uuid IS NOT NULL) AS atomic_controls,
|
|
(SELECT count(*) FROM obligation_candidates WHERE release_state = 'merged') AS merged,
|
|
(SELECT count(*) FROM obligation_candidates WHERE trigger_type IS NOT NULL) AS enriched
|
|
""")).fetchone()
|
|
|
|
validated_for_0b = row[3] - (row[7] or 0) # validated minus merged
|
|
|
|
return {
|
|
"rich_controls": row[0],
|
|
"decomposed_controls": row[1],
|
|
"total_candidates": row[2],
|
|
"validated": row[3],
|
|
"rejected": row[4],
|
|
"composed": row[5],
|
|
"atomic_controls": row[6],
|
|
"merged": row[7] or 0,
|
|
"enriched": row[8] or 0,
|
|
"ready_for_pass0b": validated_for_0b,
|
|
"decomposition_pct": round(row[1] / max(row[0], 1) * 100, 1),
|
|
"composition_pct": round(row[5] / max(validated_for_0b, 1) * 100, 1),
|
|
}
|
|
|
|
# -------------------------------------------------------------------
|
|
# DB Writers
|
|
# -------------------------------------------------------------------
|
|
|
|
def _write_obligation_candidate(self, cand: ObligationCandidate) -> None:
|
|
"""Insert an obligation candidate into the DB."""
|
|
self.db.execute(
|
|
text("""
|
|
INSERT INTO obligation_candidates (
|
|
parent_control_uuid, candidate_id,
|
|
obligation_text, action, object, condition,
|
|
normative_strength, is_test_obligation,
|
|
is_reporting_obligation, extraction_confidence,
|
|
quality_flags, release_state
|
|
) VALUES (
|
|
CAST(:parent_uuid AS uuid), :candidate_id,
|
|
:obligation_text, :action, :object, :condition,
|
|
:normative_strength, :is_test, :is_reporting,
|
|
:confidence, :quality_flags, :release_state
|
|
)
|
|
"""),
|
|
{
|
|
"parent_uuid": cand.parent_control_uuid,
|
|
"candidate_id": cand.candidate_id,
|
|
"obligation_text": cand.obligation_text,
|
|
"action": cand.action,
|
|
"object": cand.object_,
|
|
"condition": cand.condition,
|
|
"normative_strength": cand.normative_strength,
|
|
"is_test": cand.is_test_obligation,
|
|
"is_reporting": cand.is_reporting_obligation,
|
|
"confidence": cand.extraction_confidence,
|
|
"quality_flags": json.dumps(cand.quality_flags),
|
|
"release_state": cand.release_state,
|
|
},
|
|
)
|
|
|
|
def _write_atomic_control(
|
|
self, atomic: AtomicControlCandidate, obl: dict,
|
|
) -> Optional[str]:
|
|
"""Insert an atomic control and create parent link.
|
|
|
|
Returns the UUID of the newly created control, or None on failure.
|
|
"""
|
|
parent_uuid = obl["parent_uuid"]
|
|
candidate_id = obl["candidate_id"]
|
|
|
|
result = self.db.execute(
|
|
text("""
|
|
INSERT INTO canonical_controls (
|
|
control_id, title, objective, rationale,
|
|
scope, requirements,
|
|
test_procedure, evidence, severity,
|
|
open_anchors, category,
|
|
release_state, parent_control_uuid,
|
|
decomposition_method,
|
|
generation_metadata,
|
|
framework_id,
|
|
generation_strategy, pipeline_version
|
|
) VALUES (
|
|
:control_id, :title, :objective, :rationale,
|
|
:scope, :requirements,
|
|
:test_procedure, :evidence,
|
|
:severity, :open_anchors, :category,
|
|
'draft',
|
|
CAST(:parent_uuid AS uuid), 'pass0b',
|
|
:gen_meta,
|
|
CAST(:framework_id AS uuid),
|
|
'pass0b', 2
|
|
)
|
|
RETURNING id::text
|
|
"""),
|
|
{
|
|
"control_id": atomic.candidate_id,
|
|
"title": atomic.title,
|
|
"objective": atomic.objective,
|
|
"rationale": getattr(atomic, "rationale", None) or "Aus Obligation abgeleitet.",
|
|
"scope": json.dumps({}),
|
|
"requirements": json.dumps(atomic.requirements),
|
|
"test_procedure": json.dumps(atomic.test_procedure),
|
|
"evidence": json.dumps(atomic.evidence),
|
|
"severity": atomic.severity,
|
|
"open_anchors": json.dumps([]),
|
|
"category": atomic.category,
|
|
"parent_uuid": parent_uuid,
|
|
"gen_meta": json.dumps({
|
|
"decomposition_source": candidate_id,
|
|
"decomposition_method": "pass0b",
|
|
}),
|
|
"framework_id": "14b1bdd2-abc7-4a43-adae-14471ee5c7cf",
|
|
},
|
|
)
|
|
|
|
row = result.fetchone()
|
|
new_uuid = row[0] if row else None
|
|
|
|
# Create M:N parent link (control_parent_links)
|
|
if new_uuid:
|
|
citation = _parse_citation(obl.get("parent_citation", ""))
|
|
self.db.execute(
|
|
text("""
|
|
INSERT INTO control_parent_links
|
|
(control_uuid, parent_control_uuid, link_type, confidence,
|
|
source_regulation, source_article, obligation_candidate_id)
|
|
VALUES
|
|
(CAST(:cu AS uuid), CAST(:pu AS uuid), 'decomposition', 1.0,
|
|
:sr, :sa, CAST(:oci AS uuid))
|
|
ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING
|
|
"""),
|
|
{
|
|
"cu": new_uuid,
|
|
"pu": parent_uuid,
|
|
"sr": citation.get("source", ""),
|
|
"sa": citation.get("article", ""),
|
|
"oci": obl["oc_id"],
|
|
},
|
|
)
|
|
|
|
return new_uuid
|
|
|
|
def _next_atomic_seq(self, parent_control_id: str) -> int:
|
|
"""Get the next sequence number for atomic controls under a parent."""
|
|
result = self.db.execute(
|
|
text("""
|
|
SELECT count(*) FROM canonical_controls
|
|
WHERE parent_control_uuid = (
|
|
SELECT id FROM canonical_controls
|
|
WHERE control_id = :parent_id
|
|
LIMIT 1
|
|
)
|
|
"""),
|
|
{"parent_id": parent_control_id},
|
|
).fetchone()
|
|
return (result[0] if result else 0) + 1
|
|
|
|
|
|
# -------------------------------------------------------------------
|
|
# Anthropic Batch API: Submit all controls as async batch (50% off)
|
|
# -------------------------------------------------------------------
|
|
|
|
async def submit_batch_pass0a(
|
|
self,
|
|
limit: int = 0,
|
|
batch_size: int = 5,
|
|
category_filter: Optional[str] = None,
|
|
source_filter: Optional[str] = None,
|
|
) -> dict:
|
|
"""Create an Anthropic Batch API request for Pass 0a.
|
|
|
|
Groups controls into content-batches of `batch_size`, then submits
|
|
all batches as one Anthropic Batch (up to 10,000 requests).
|
|
Returns batch metadata for polling.
|
|
"""
|
|
query = """
|
|
SELECT cc.id, cc.control_id, cc.title, cc.objective,
|
|
cc.requirements, cc.test_procedure,
|
|
cc.source_citation, cc.category
|
|
FROM canonical_controls cc
|
|
WHERE cc.release_state NOT IN ('deprecated')
|
|
AND cc.parent_control_uuid IS NULL
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM obligation_candidates oc
|
|
WHERE oc.parent_control_uuid = cc.id
|
|
)
|
|
"""
|
|
params = {}
|
|
if category_filter:
|
|
cats = [c.strip() for c in category_filter.split(",") if c.strip()]
|
|
if cats:
|
|
query += " AND cc.category IN :cats"
|
|
params["cats"] = tuple(cats)
|
|
|
|
if source_filter:
|
|
sources = [s.strip() for s in source_filter.split(",") if s.strip()]
|
|
if sources:
|
|
clauses = []
|
|
for idx, src in enumerate(sources):
|
|
key = f"src_{idx}"
|
|
clauses.append(f"cc.source_citation::text ILIKE :{key}")
|
|
params[key] = f"%{src}%"
|
|
query += " AND (" + " OR ".join(clauses) + ")"
|
|
|
|
query += " ORDER BY cc.created_at"
|
|
if limit > 0:
|
|
query += f" LIMIT {limit}"
|
|
|
|
rows = self.db.execute(text(query), params).fetchall()
|
|
|
|
# Prepare control data (skip empty)
|
|
prepared = []
|
|
for row in rows:
|
|
title = row[2] or ""
|
|
objective = row[3] or ""
|
|
req_str = _format_field(row[4] or "")
|
|
if not title and not objective and not req_str:
|
|
continue
|
|
prepared.append({
|
|
"uuid": str(row[0]),
|
|
"control_id": row[1] or "",
|
|
"title": title,
|
|
"objective": objective,
|
|
"requirements": req_str,
|
|
"test_procedure": _format_field(row[5] or ""),
|
|
"source_ref": _format_citation(row[6] or ""),
|
|
"category": row[7] or "",
|
|
})
|
|
|
|
if not prepared:
|
|
return {"status": "empty", "total_controls": 0}
|
|
|
|
# Build batch requests (each request = batch_size controls)
|
|
requests = []
|
|
for i in range(0, len(prepared), batch_size):
|
|
batch = prepared[i : i + batch_size]
|
|
if len(batch) > 1:
|
|
prompt = _build_pass0a_batch_prompt(batch)
|
|
else:
|
|
ctrl = batch[0]
|
|
prompt = _build_pass0a_prompt(
|
|
title=ctrl["title"], objective=ctrl["objective"],
|
|
requirements=ctrl["requirements"],
|
|
test_procedure=ctrl["test_procedure"],
|
|
source_ref=ctrl["source_ref"],
|
|
)
|
|
|
|
# Control IDs in custom_id for result mapping
|
|
ids_str = "+".join(c["control_id"] for c in batch)
|
|
requests.append({
|
|
"custom_id": f"p0a_{ids_str}",
|
|
"params": {
|
|
"model": ANTHROPIC_MODEL,
|
|
"max_tokens": max(8192, len(batch) * 2000),
|
|
"system": [
|
|
{
|
|
"type": "text",
|
|
"text": _PASS0A_SYSTEM_PROMPT,
|
|
"cache_control": {"type": "ephemeral"},
|
|
}
|
|
],
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
},
|
|
})
|
|
|
|
batch_result = await create_anthropic_batch(requests)
|
|
batch_id = batch_result.get("id", "")
|
|
|
|
logger.info(
|
|
"Batch API submitted: %s — %d requests (%d controls, batch_size=%d)",
|
|
batch_id, len(requests), len(prepared), batch_size,
|
|
)
|
|
|
|
return {
|
|
"status": "submitted",
|
|
"batch_id": batch_id,
|
|
"total_controls": len(prepared),
|
|
"total_requests": len(requests),
|
|
"batch_size": batch_size,
|
|
"category_filter": category_filter,
|
|
"source_filter": source_filter,
|
|
}
|
|
|
|
async def submit_batch_pass0b(
|
|
self,
|
|
limit: int = 0,
|
|
batch_size: int = 5,
|
|
) -> dict:
|
|
"""Create an Anthropic Batch API request for Pass 0b."""
|
|
query = """
|
|
SELECT oc.id, oc.candidate_id, oc.parent_control_uuid,
|
|
oc.obligation_text, oc.action, oc.object,
|
|
oc.is_test_obligation, oc.is_reporting_obligation,
|
|
cc.title AS parent_title,
|
|
cc.category AS parent_category,
|
|
cc.source_citation AS parent_citation,
|
|
cc.severity AS parent_severity,
|
|
cc.control_id AS parent_control_id
|
|
FROM obligation_candidates oc
|
|
JOIN canonical_controls cc ON cc.id = oc.parent_control_uuid
|
|
WHERE oc.release_state = 'validated'
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM canonical_controls ac
|
|
WHERE ac.parent_control_uuid = oc.parent_control_uuid
|
|
AND ac.decomposition_method = 'pass0b'
|
|
AND ac.title LIKE '%' || LEFT(oc.action, 20) || '%'
|
|
)
|
|
"""
|
|
if limit > 0:
|
|
query += f" LIMIT {limit}"
|
|
|
|
rows = self.db.execute(text(query)).fetchall()
|
|
|
|
prepared = []
|
|
for row in rows:
|
|
prepared.append({
|
|
"oc_id": str(row[0]),
|
|
"candidate_id": row[1] or "",
|
|
"parent_uuid": str(row[2]),
|
|
"obligation_text": row[3] or "",
|
|
"action": row[4] or "",
|
|
"object": row[5] or "",
|
|
"is_test": row[6],
|
|
"is_reporting": row[7],
|
|
"parent_title": row[8] or "",
|
|
"parent_category": row[9] or "",
|
|
"parent_citation": row[10] or "",
|
|
"parent_severity": row[11] or "medium",
|
|
"parent_control_id": row[12] or "",
|
|
"source_ref": _format_citation(row[10] or ""),
|
|
})
|
|
|
|
if not prepared:
|
|
return {"status": "empty", "total_candidates": 0}
|
|
|
|
requests = []
|
|
for i in range(0, len(prepared), batch_size):
|
|
batch = prepared[i : i + batch_size]
|
|
if len(batch) > 1:
|
|
prompt = _build_pass0b_batch_prompt(batch)
|
|
else:
|
|
obl = batch[0]
|
|
prompt = _build_pass0b_prompt(
|
|
obligation_text=obl["obligation_text"],
|
|
action=obl["action"], object_=obl["object"],
|
|
parent_title=obl["parent_title"],
|
|
parent_category=obl["parent_category"],
|
|
source_ref=obl["source_ref"],
|
|
)
|
|
|
|
ids_str = "+".join(o["candidate_id"] for o in batch)
|
|
requests.append({
|
|
"custom_id": f"p0b_{ids_str}",
|
|
"params": {
|
|
"model": ANTHROPIC_MODEL,
|
|
"max_tokens": max(8192, len(batch) * 1500),
|
|
"system": [
|
|
{
|
|
"type": "text",
|
|
"text": _PASS0B_SYSTEM_PROMPT,
|
|
"cache_control": {"type": "ephemeral"},
|
|
}
|
|
],
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
},
|
|
})
|
|
|
|
batch_result = await create_anthropic_batch(requests)
|
|
batch_id = batch_result.get("id", "")
|
|
|
|
logger.info(
|
|
"Batch API Pass 0b submitted: %s — %d requests (%d candidates)",
|
|
batch_id, len(requests), len(prepared),
|
|
)
|
|
|
|
return {
|
|
"status": "submitted",
|
|
"batch_id": batch_id,
|
|
"total_candidates": len(prepared),
|
|
"total_requests": len(requests),
|
|
"batch_size": batch_size,
|
|
}
|
|
|
|
async def process_batch_results(
|
|
self, batch_id: str, pass_type: str = "0a",
|
|
) -> dict:
|
|
"""Fetch and process results from a completed Anthropic batch.
|
|
|
|
Args:
|
|
batch_id: Anthropic batch ID.
|
|
pass_type: "0a" or "0b".
|
|
"""
|
|
# Check status first
|
|
status = await check_batch_status(batch_id)
|
|
if status.get("processing_status") != "ended":
|
|
return {
|
|
"status": "not_ready",
|
|
"processing_status": status.get("processing_status"),
|
|
"request_counts": status.get("request_counts", {}),
|
|
}
|
|
|
|
results = await fetch_batch_results(batch_id)
|
|
stats = {
|
|
"results_processed": 0,
|
|
"results_succeeded": 0,
|
|
"results_failed": 0,
|
|
"errors": 0,
|
|
}
|
|
|
|
if pass_type == "0a":
|
|
stats.update({
|
|
"controls_processed": 0,
|
|
"obligations_extracted": 0,
|
|
"obligations_validated": 0,
|
|
"obligations_rejected": 0,
|
|
})
|
|
else:
|
|
stats.update({
|
|
"candidates_processed": 0,
|
|
"controls_created": 0,
|
|
"llm_failures": 0,
|
|
})
|
|
|
|
for result in results:
|
|
custom_id = result.get("custom_id", "")
|
|
result_data = result.get("result", {})
|
|
stats["results_processed"] += 1
|
|
|
|
if result_data.get("type") != "succeeded":
|
|
stats["results_failed"] += 1
|
|
logger.warning("Batch result failed: %s — %s", custom_id, result_data)
|
|
continue
|
|
|
|
stats["results_succeeded"] += 1
|
|
message = result_data.get("message", {})
|
|
content = message.get("content", [])
|
|
text_content = content[0].get("text", "") if content else ""
|
|
|
|
try:
|
|
if pass_type == "0a":
|
|
self._handle_batch_result_0a(custom_id, text_content, stats)
|
|
else:
|
|
await self._handle_batch_result_0b(custom_id, text_content, stats)
|
|
except Exception as e:
|
|
logger.error("Processing batch result %s: %s", custom_id, e)
|
|
stats["errors"] += 1
|
|
|
|
self.db.commit()
|
|
stats["status"] = "completed"
|
|
return stats
|
|
|
|
def _handle_batch_result_0a(
|
|
self, custom_id: str, text_content: str, stats: dict,
|
|
) -> None:
|
|
"""Process a single Pass 0a batch result."""
|
|
# custom_id format: p0a_CTRL-001+CTRL-002+...
|
|
prefix = "p0a_"
|
|
control_ids = custom_id[len(prefix):].split("+") if custom_id.startswith(prefix) else []
|
|
|
|
if len(control_ids) == 1:
|
|
raw_obls = _parse_json_array(text_content)
|
|
control_id = control_ids[0]
|
|
uuid_row = self.db.execute(
|
|
text("SELECT id FROM canonical_controls WHERE control_id = :cid LIMIT 1"),
|
|
{"cid": control_id},
|
|
).fetchone()
|
|
if not uuid_row:
|
|
return
|
|
control_uuid = str(uuid_row[0])
|
|
if not raw_obls:
|
|
raw_obls = [{"obligation_text": control_id, "action": "sicherstellen",
|
|
"object": control_id}]
|
|
self._process_pass0a_obligations(raw_obls, control_id, control_uuid, stats)
|
|
stats["controls_processed"] += 1
|
|
else:
|
|
results_by_id = _parse_json_object(text_content)
|
|
for control_id in control_ids:
|
|
uuid_row = self.db.execute(
|
|
text("SELECT id FROM canonical_controls WHERE control_id = :cid LIMIT 1"),
|
|
{"cid": control_id},
|
|
).fetchone()
|
|
if not uuid_row:
|
|
continue
|
|
control_uuid = str(uuid_row[0])
|
|
raw_obls = results_by_id.get(control_id, [])
|
|
if not isinstance(raw_obls, list):
|
|
raw_obls = [raw_obls] if raw_obls else []
|
|
if not raw_obls:
|
|
raw_obls = [{"obligation_text": control_id, "action": "sicherstellen",
|
|
"object": control_id}]
|
|
self._process_pass0a_obligations(raw_obls, control_id, control_uuid, stats)
|
|
stats["controls_processed"] += 1
|
|
|
|
async def _handle_batch_result_0b(
|
|
self, custom_id: str, text_content: str, stats: dict,
|
|
) -> None:
|
|
"""Process a single Pass 0b batch result."""
|
|
prefix = "p0b_"
|
|
candidate_ids = custom_id[len(prefix):].split("+") if custom_id.startswith(prefix) else []
|
|
|
|
if len(candidate_ids) == 1:
|
|
parsed = _parse_json_object(text_content)
|
|
obl = self._load_obligation_for_0b(candidate_ids[0])
|
|
if obl:
|
|
await self._process_pass0b_control(obl, parsed, stats)
|
|
else:
|
|
results_by_id = _parse_json_object(text_content)
|
|
for cand_id in candidate_ids:
|
|
parsed = results_by_id.get(cand_id, {})
|
|
obl = self._load_obligation_for_0b(cand_id)
|
|
if obl:
|
|
await self._process_pass0b_control(obl, parsed, stats)
|
|
|
|
def _load_obligation_for_0b(self, candidate_id: str) -> Optional[dict]:
|
|
"""Load obligation data needed for Pass 0b processing."""
|
|
row = self.db.execute(
|
|
text("""
|
|
SELECT oc.id, oc.candidate_id, oc.parent_control_uuid,
|
|
oc.obligation_text, oc.action, oc.object,
|
|
oc.is_test_obligation, oc.is_reporting_obligation,
|
|
cc.title, cc.category, cc.source_citation,
|
|
cc.severity, cc.control_id
|
|
FROM obligation_candidates oc
|
|
JOIN canonical_controls cc ON cc.id = oc.parent_control_uuid
|
|
WHERE oc.candidate_id = :cid
|
|
"""),
|
|
{"cid": candidate_id},
|
|
).fetchone()
|
|
if not row:
|
|
return None
|
|
return {
|
|
"oc_id": str(row[0]),
|
|
"candidate_id": row[1] or "",
|
|
"parent_uuid": str(row[2]),
|
|
"obligation_text": row[3] or "",
|
|
"action": row[4] or "",
|
|
"object": row[5] or "",
|
|
"is_test": row[6],
|
|
"is_reporting": row[7],
|
|
"parent_title": row[8] or "",
|
|
"parent_category": row[9] or "",
|
|
"parent_citation": row[10] or "",
|
|
"parent_severity": row[11] or "medium",
|
|
"parent_control_id": row[12] or "",
|
|
"source_ref": _format_citation(row[10] or ""),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _fallback_obligation(ctrl: dict) -> dict:
|
|
"""Create a single fallback obligation when LLM returns nothing."""
|
|
return {
|
|
"obligation_text": ctrl.get("objective") or ctrl.get("title", ""),
|
|
"action": "sicherstellen",
|
|
"object": ctrl.get("title", ""),
|
|
"condition": None,
|
|
"normative_strength": "must",
|
|
"is_test_obligation": False,
|
|
"is_reporting_obligation": False,
|
|
}
|
|
|
|
|
|
def _format_field(value) -> str:
|
|
"""Format a requirements/test_procedure field for the LLM prompt."""
|
|
if not value:
|
|
return ""
|
|
if isinstance(value, str):
|
|
try:
|
|
parsed = json.loads(value)
|
|
if isinstance(parsed, list):
|
|
return "\n".join(f"- {item}" for item in parsed)
|
|
return value
|
|
except (json.JSONDecodeError, TypeError):
|
|
return value
|
|
if isinstance(value, list):
|
|
return "\n".join(f"- {item}" for item in value)
|
|
return str(value)
|
|
|
|
|
|
def _format_citation(citation) -> str:
|
|
"""Format source_citation for display."""
|
|
if not citation:
|
|
return ""
|
|
if isinstance(citation, str):
|
|
try:
|
|
c = json.loads(citation)
|
|
if isinstance(c, dict):
|
|
parts = []
|
|
if c.get("source"):
|
|
parts.append(c["source"])
|
|
if c.get("article"):
|
|
parts.append(c["article"])
|
|
if c.get("paragraph"):
|
|
parts.append(c["paragraph"])
|
|
return " ".join(parts) if parts else citation
|
|
except (json.JSONDecodeError, TypeError):
|
|
return citation
|
|
return str(citation)
|
|
|
|
|
|
def _parse_citation(citation) -> dict:
|
|
"""Parse source_citation JSONB into a dict with source/article/paragraph."""
|
|
if not citation:
|
|
return {}
|
|
if isinstance(citation, dict):
|
|
return citation
|
|
if isinstance(citation, str):
|
|
try:
|
|
c = json.loads(citation)
|
|
if isinstance(c, dict):
|
|
return c
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
return {}
|
|
|
|
|
|
def _compute_extraction_confidence(flags: dict) -> float:
|
|
"""Compute confidence score from quality flags."""
|
|
score = 0.0
|
|
weights = {
|
|
"has_normative_signal": 0.30,
|
|
"single_action": 0.20,
|
|
"not_rationale": 0.20,
|
|
"not_evidence_only": 0.15,
|
|
"min_length": 0.10,
|
|
"has_parent_link": 0.05,
|
|
}
|
|
for flag, weight in weights.items():
|
|
if flags.get(flag, False):
|
|
score += weight
|
|
return round(score, 2)
|
|
|
|
|
|
def _normalize_severity(val: str) -> str:
|
|
"""Normalize severity value."""
|
|
val = (val or "medium").lower().strip()
|
|
if val in ("critical", "high", "medium", "low"):
|
|
return val
|
|
return "medium"
|
|
|
|
|
|
def _template_fallback(
|
|
obligation_text: str, action: str, object_: str,
|
|
parent_title: str, parent_severity: str, parent_category: str,
|
|
is_test: bool, is_reporting: bool,
|
|
) -> AtomicControlCandidate:
|
|
"""Create an atomic control candidate from template when LLM fails."""
|
|
if is_test:
|
|
title = f"Test: {object_[:60]}" if object_ else f"Test: {action[:60]}"
|
|
test_proc = [f"Prüfung der {object_ or action}"]
|
|
evidence = ["Testprotokoll", "Prüfbericht"]
|
|
elif is_reporting:
|
|
title = f"Meldepflicht: {object_[:60]}" if object_ else f"Meldung: {action[:60]}"
|
|
test_proc = ["Prüfung des Meldeprozesses", "Stichprobe gemeldeter Vorfälle"]
|
|
evidence = ["Meldeprozess-Dokumentation", "Meldeformulare"]
|
|
else:
|
|
title = f"{action.capitalize()}: {object_[:60]}" if object_ else parent_title[:80]
|
|
test_proc = [f"Prüfung der {action}"]
|
|
evidence = ["Dokumentation", "Konfigurationsnachweis"]
|
|
|
|
return AtomicControlCandidate(
|
|
title=title[:200],
|
|
objective=obligation_text[:2000],
|
|
requirements=[obligation_text] if obligation_text else [],
|
|
test_procedure=test_proc,
|
|
evidence=evidence,
|
|
severity=_normalize_severity(parent_severity),
|
|
category=parent_category,
|
|
)
|