All checks were successful
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 34s
CI/CD / test-python-backend-compliance (push) Successful in 32s
CI/CD / test-python-document-crawler (push) Successful in 24s
CI/CD / test-python-dsms-gateway (push) Successful in 19s
CI/CD / validate-canonical-controls (push) Successful in 11s
CI/CD / Deploy (push) Successful in 1s
Many regulation codes (nist_sp800_53r5, eucsa, owasp_top10_2021, EDPB guidelines, EU laws, AT/FR/ES/NL/IT/HU laws) were defaulting to Rule 3 (restricted) because they weren't in REGULATION_LICENSE_MAP. Now all ~100 regulation codes from RAG are properly mapped to Rule 1 or 2. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1085 lines
50 KiB
Python
1085 lines
50 KiB
Python
"""
|
|
Control Generator Pipeline — RAG → License → Structure/Reform → Harmonize → Anchor → Store.
|
|
|
|
7-stage pipeline that generates canonical security controls from RAG chunks:
|
|
1. RAG SCAN — Load unprocessed chunks (or new document versions)
|
|
2. LICENSE CLASSIFY — Determine which of 3 license rules applies
|
|
3a. STRUCTURE — Rule 1+2: Structure original text into control format
|
|
3b. LLM REFORM — Rule 3: Fully reformulate (no original text, no source names)
|
|
4. HARMONIZE — Check against existing controls for duplicates
|
|
5. ANCHOR SEARCH — Find open-source references (OWASP, NIST, ENISA)
|
|
6. STORE — Persist to DB with correct visibility flags
|
|
7. MARK PROCESSED — Mark RAG chunks as processed (with version tracking)
|
|
|
|
Three License Rules:
|
|
Rule 1 (free_use): Laws, Public Domain — original text allowed
|
|
Rule 2 (citation_required): CC-BY, CC-BY-SA — original text with citation
|
|
Rule 3 (restricted): BSI, ISO — full reformulation, no source names
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import uuid
|
|
from dataclasses import dataclass, field, asdict
|
|
from datetime import datetime, timezone
|
|
from typing import Dict, List, Optional, Set
|
|
|
|
import httpx
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import text
|
|
from sqlalchemy.orm import Session
|
|
|
|
from .rag_client import ComplianceRAGClient, RAGSearchResult, get_rag_client
|
|
from .similarity_detector import check_similarity, SimilarityReport
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
SDK_URL = os.getenv("SDK_URL", "http://ai-compliance-sdk:8090")
|
|
EMBEDDING_URL = os.getenv("EMBEDDING_URL", "http://embedding-service:8087")
|
|
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
|
|
ANTHROPIC_MODEL = os.getenv("CONTROL_GEN_ANTHROPIC_MODEL", "claude-sonnet-4-6")
|
|
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
|
|
OLLAMA_MODEL = os.getenv("CONTROL_GEN_OLLAMA_MODEL", "qwen3:30b-a3b")
|
|
LLM_TIMEOUT = float(os.getenv("CONTROL_GEN_LLM_TIMEOUT", "120"))
|
|
|
|
HARMONIZATION_THRESHOLD = 0.85 # Cosine similarity above this = duplicate
|
|
|
|
ALL_COLLECTIONS = [
|
|
"bp_compliance_ce",
|
|
"bp_compliance_recht",
|
|
"bp_compliance_gesetze",
|
|
"bp_compliance_datenschutz",
|
|
"bp_dsfa_corpus",
|
|
"bp_legal_templates",
|
|
]
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# License Mapping (3-Rule System)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
REGULATION_LICENSE_MAP: dict[str, dict] = {
|
|
# RULE 1: FREE USE — Laws, Public Domain
|
|
# EU Regulations
|
|
"eu_2016_679": {"license": "EU_LAW", "rule": 1, "name": "DSGVO"},
|
|
"eu_2024_1689": {"license": "EU_LAW", "rule": 1, "name": "AI Act (KI-Verordnung)"},
|
|
"eu_2022_2555": {"license": "EU_LAW", "rule": 1, "name": "NIS2"},
|
|
"eu_2024_2847": {"license": "EU_LAW", "rule": 1, "name": "Cyber Resilience Act (CRA)"},
|
|
"eu_2023_1230": {"license": "EU_LAW", "rule": 1, "name": "Maschinenverordnung"},
|
|
"eu_2022_2065": {"license": "EU_LAW", "rule": 1, "name": "Digital Services Act (DSA)"},
|
|
"eu_2022_1925": {"license": "EU_LAW", "rule": 1, "name": "Digital Markets Act (DMA)"},
|
|
"eu_2022_868": {"license": "EU_LAW", "rule": 1, "name": "Data Governance Act (DGA)"},
|
|
"eu_2019_770": {"license": "EU_LAW", "rule": 1, "name": "Digitale-Inhalte-Richtlinie"},
|
|
"eu_2021_914": {"license": "EU_LAW", "rule": 1, "name": "Standardvertragsklauseln (SCC)"},
|
|
"eu_2002_58": {"license": "EU_LAW", "rule": 1, "name": "ePrivacy-Richtlinie"},
|
|
"eu_2000_31": {"license": "EU_LAW", "rule": 1, "name": "E-Commerce-Richtlinie"},
|
|
"eu_2023_1803": {"license": "EU_LAW", "rule": 1, "name": "IFRS-Uebernahmeverordnung"},
|
|
"eucsa": {"license": "EU_LAW", "rule": 1, "name": "EU Cybersecurity Act"},
|
|
"dataact": {"license": "EU_LAW", "rule": 1, "name": "Data Act"},
|
|
"dora": {"license": "EU_LAW", "rule": 1, "name": "Digital Operational Resilience Act"},
|
|
"ehds": {"license": "EU_LAW", "rule": 1, "name": "European Health Data Space"},
|
|
"gpsr": {"license": "EU_LAW", "rule": 1, "name": "Allgemeine Produktsicherheitsverordnung"},
|
|
"mica": {"license": "EU_LAW", "rule": 1, "name": "Markets in Crypto-Assets"},
|
|
"psd2": {"license": "EU_LAW", "rule": 1, "name": "Zahlungsdiensterichtlinie 2"},
|
|
"dpf": {"license": "EU_LAW", "rule": 1, "name": "EU-US Data Privacy Framework"},
|
|
"dsm": {"license": "EU_LAW", "rule": 1, "name": "DSM-Urheberrechtsrichtlinie"},
|
|
"amlr": {"license": "EU_LAW", "rule": 1, "name": "AML-Verordnung"},
|
|
"eu_blue_guide_2022": {"license": "EU_PUBLIC", "rule": 1, "name": "Blue Guide 2022"},
|
|
# NIST (Public Domain — all variants)
|
|
"nist_sp_800_53": {"license": "NIST_PUBLIC_DOMAIN", "rule": 1, "name": "NIST SP 800-53"},
|
|
"nist_sp800_53r5": {"license": "NIST_PUBLIC_DOMAIN", "rule": 1, "name": "NIST SP 800-53 Rev.5"},
|
|
"nist_sp_800_63b": {"license": "NIST_PUBLIC_DOMAIN", "rule": 1, "name": "NIST SP 800-63B"},
|
|
"nist_sp800_63_3": {"license": "NIST_PUBLIC_DOMAIN", "rule": 1, "name": "NIST SP 800-63-3"},
|
|
"nist_csf_2_0": {"license": "NIST_PUBLIC_DOMAIN", "rule": 1, "name": "NIST CSF 2.0"},
|
|
"nist_sp_800_218": {"license": "NIST_PUBLIC_DOMAIN", "rule": 1, "name": "NIST SSDF"},
|
|
"nist_sp800_207": {"license": "NIST_PUBLIC_DOMAIN", "rule": 1, "name": "NIST SP 800-207 Zero Trust"},
|
|
"nist_ai_rmf": {"license": "NIST_PUBLIC_DOMAIN", "rule": 1, "name": "NIST AI Risk Management Framework"},
|
|
"nistir_8259a": {"license": "NIST_PUBLIC_DOMAIN", "rule": 1, "name": "NISTIR 8259A IoT Security"},
|
|
"cisa_secure_by_design": {"license": "US_GOV_PUBLIC", "rule": 1, "name": "CISA Secure by Design"},
|
|
# German Laws
|
|
"bdsg": {"license": "DE_LAW", "rule": 1, "name": "BDSG"},
|
|
"bdsg_2018_komplett": {"license": "DE_LAW", "rule": 1, "name": "BDSG 2018"},
|
|
"ttdsg": {"license": "DE_LAW", "rule": 1, "name": "TTDSG"},
|
|
"tdddg_25": {"license": "DE_LAW", "rule": 1, "name": "TDDDG"},
|
|
"tkg": {"license": "DE_LAW", "rule": 1, "name": "TKG"},
|
|
"de_tkg": {"license": "DE_LAW", "rule": 1, "name": "TKG"},
|
|
"bgb_komplett": {"license": "DE_LAW", "rule": 1, "name": "BGB"},
|
|
"hgb": {"license": "DE_LAW", "rule": 1, "name": "HGB"},
|
|
"hgb_komplett": {"license": "DE_LAW", "rule": 1, "name": "HGB"},
|
|
"urhg_komplett": {"license": "DE_LAW", "rule": 1, "name": "UrhG"},
|
|
"uwg": {"license": "DE_LAW", "rule": 1, "name": "UWG"},
|
|
"tmg_komplett": {"license": "DE_LAW", "rule": 1, "name": "TMG"},
|
|
"gewo": {"license": "DE_LAW", "rule": 1, "name": "GewO"},
|
|
"ao": {"license": "DE_LAW", "rule": 1, "name": "Abgabenordnung"},
|
|
"ao_komplett": {"license": "DE_LAW", "rule": 1, "name": "Abgabenordnung"},
|
|
"battdg": {"license": "DE_LAW", "rule": 1, "name": "Batteriegesetz"},
|
|
# Austrian Laws
|
|
"at_dsg": {"license": "AT_LAW", "rule": 1, "name": "AT DSG"},
|
|
"at_abgb": {"license": "AT_LAW", "rule": 1, "name": "AT ABGB"},
|
|
"at_abgb_agb": {"license": "AT_LAW", "rule": 1, "name": "AT ABGB AGB-Recht"},
|
|
"at_bao": {"license": "AT_LAW", "rule": 1, "name": "AT BAO"},
|
|
"at_bao_ret": {"license": "AT_LAW", "rule": 1, "name": "AT BAO Retention"},
|
|
"at_ecg": {"license": "AT_LAW", "rule": 1, "name": "AT E-Commerce-Gesetz"},
|
|
"at_kschg": {"license": "AT_LAW", "rule": 1, "name": "AT Konsumentenschutzgesetz"},
|
|
"at_medieng": {"license": "AT_LAW", "rule": 1, "name": "AT Mediengesetz"},
|
|
"at_tkg": {"license": "AT_LAW", "rule": 1, "name": "AT TKG"},
|
|
"at_ugb": {"license": "AT_LAW", "rule": 1, "name": "AT UGB"},
|
|
"at_ugb_ret": {"license": "AT_LAW", "rule": 1, "name": "AT UGB Retention"},
|
|
"at_uwg": {"license": "AT_LAW", "rule": 1, "name": "AT UWG"},
|
|
# Other EU Member State Laws
|
|
"fr_loi_informatique": {"license": "FR_LAW", "rule": 1, "name": "FR Loi Informatique"},
|
|
"es_lopdgdd": {"license": "ES_LAW", "rule": 1, "name": "ES LOPDGDD"},
|
|
"nl_uavg": {"license": "NL_LAW", "rule": 1, "name": "NL UAVG"},
|
|
"it_codice_privacy": {"license": "IT_LAW", "rule": 1, "name": "IT Codice Privacy"},
|
|
"hu_info_tv": {"license": "HU_LAW", "rule": 1, "name": "HU Információs törvény"},
|
|
# EDPB Guidelines (EU Public Authority)
|
|
"edpb_01_2020": {"license": "EU_PUBLIC", "rule": 1, "name": "EDPB 01/2020 Ergaenzende Massnahmen"},
|
|
"edpb_02_2023": {"license": "EU_PUBLIC", "rule": 1, "name": "EDPB 02/2023 Technischer Anwendungsbereich"},
|
|
"edpb_05_2020": {"license": "EU_PUBLIC", "rule": 1, "name": "EDPB 05/2020 Einwilligung"},
|
|
"edpb_09_2022": {"license": "EU_PUBLIC", "rule": 1, "name": "EDPB 09/2022 Datenschutzverletzungen"},
|
|
"edpb_bcr_01_2022": {"license": "EU_PUBLIC", "rule": 1, "name": "EDPB BCR Leitlinien"},
|
|
"edpb_breach_09_2022": {"license": "EU_PUBLIC", "rule": 1, "name": "EDPB Breach Notification"},
|
|
"edpb_connected_vehicles_01_2020": {"license": "EU_PUBLIC", "rule": 1, "name": "EDPB Connected Vehicles"},
|
|
"edpb_dpbd_04_2019": {"license": "EU_PUBLIC", "rule": 1, "name": "EDPB Data Protection by Design"},
|
|
"edpb_eprivacy_02_2023": {"license": "EU_PUBLIC", "rule": 1, "name": "EDPB ePrivacy"},
|
|
"edpb_facial_recognition_05_2022": {"license": "EU_PUBLIC", "rule": 1, "name": "EDPB Facial Recognition"},
|
|
"edpb_fines_04_2022": {"license": "EU_PUBLIC", "rule": 1, "name": "EDPB Fines Calculation"},
|
|
"edpb_legitimate_interest": {"license": "EU_PUBLIC", "rule": 1, "name": "EDPB Legitimate Interest"},
|
|
"edpb_legitimate_interest_01_2024": {"license": "EU_PUBLIC","rule": 1, "name": "EDPB Legitimate Interest 2024"},
|
|
"edpb_social_media_08_2020": {"license": "EU_PUBLIC", "rule": 1, "name": "EDPB Social Media"},
|
|
"edpb_transfers_01_2020":{"license": "EU_PUBLIC", "rule": 1, "name": "EDPB Transfers 01/2020"},
|
|
"edpb_transfers_07_2020":{"license": "EU_PUBLIC", "rule": 1, "name": "EDPB Transfers 07/2020"},
|
|
"edpb_video_03_2019": {"license": "EU_PUBLIC", "rule": 1, "name": "EDPB Video Surveillance"},
|
|
"edps_dpia_list": {"license": "EU_PUBLIC", "rule": 1, "name": "EDPS DPIA Liste"},
|
|
# WP29 (pre-EDPB) Guidelines
|
|
"wp244_profiling": {"license": "EU_PUBLIC", "rule": 1, "name": "WP29 Profiling"},
|
|
"wp251_profiling": {"license": "EU_PUBLIC", "rule": 1, "name": "WP29 Data Portability"},
|
|
"wp260_transparency": {"license": "EU_PUBLIC", "rule": 1, "name": "WP29 Transparency"},
|
|
|
|
# RULE 2: CITATION REQUIRED — CC-BY, CC-BY-SA
|
|
"owasp_asvs": {"license": "CC-BY-SA-4.0", "rule": 2, "name": "OWASP ASVS",
|
|
"attribution": "OWASP Foundation, CC BY-SA 4.0"},
|
|
"owasp_masvs": {"license": "CC-BY-SA-4.0", "rule": 2, "name": "OWASP MASVS",
|
|
"attribution": "OWASP Foundation, CC BY-SA 4.0"},
|
|
"owasp_top10": {"license": "CC-BY-SA-4.0", "rule": 2, "name": "OWASP Top 10",
|
|
"attribution": "OWASP Foundation, CC BY-SA 4.0"},
|
|
"owasp_top10_2021": {"license": "CC-BY-SA-4.0", "rule": 2, "name": "OWASP Top 10 2021",
|
|
"attribution": "OWASP Foundation, CC BY-SA 4.0"},
|
|
"owasp_api_top10_2023": {"license": "CC-BY-SA-4.0", "rule": 2, "name": "OWASP API Top 10 2023",
|
|
"attribution": "OWASP Foundation, CC BY-SA 4.0"},
|
|
"owasp_samm": {"license": "CC-BY-SA-4.0", "rule": 2, "name": "OWASP SAMM",
|
|
"attribution": "OWASP Foundation, CC BY-SA 4.0"},
|
|
"oecd_ai_principles": {"license": "OECD_PUBLIC", "rule": 2, "name": "OECD AI Principles",
|
|
"attribution": "OECD"},
|
|
|
|
# RULE 3: RESTRICTED — Full reformulation required
|
|
# Names stored as INTERNAL_ONLY — never exposed to customers
|
|
}
|
|
|
|
# Prefix-based matching for wildcard entries
|
|
_RULE3_PREFIXES = ["bsi_", "iso_", "etsi_"]
|
|
_RULE2_PREFIXES = ["enisa_"]
|
|
|
|
|
|
def _classify_regulation(regulation_code: str) -> dict:
|
|
"""Determine license rule for a regulation_code."""
|
|
code = regulation_code.lower().strip()
|
|
|
|
# Exact match first
|
|
if code in REGULATION_LICENSE_MAP:
|
|
return REGULATION_LICENSE_MAP[code]
|
|
|
|
# Prefix match for Rule 2
|
|
for prefix in _RULE2_PREFIXES:
|
|
if code.startswith(prefix):
|
|
return {"license": "CC-BY-4.0", "rule": 2, "name": "ENISA",
|
|
"attribution": "ENISA, CC BY 4.0"}
|
|
|
|
# Prefix match for Rule 3
|
|
for prefix in _RULE3_PREFIXES:
|
|
if code.startswith(prefix):
|
|
return {"license": f"{prefix.rstrip('_').upper()}_RESTRICTED", "rule": 3,
|
|
"name": "INTERNAL_ONLY"}
|
|
|
|
# Unknown → treat as restricted (safe default)
|
|
logger.warning("Unknown regulation_code %r — defaulting to Rule 3 (restricted)", code)
|
|
return {"license": "UNKNOWN", "rule": 3, "name": "INTERNAL_ONLY"}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Domain detection from content
|
|
# ---------------------------------------------------------------------------
|
|
|
|
DOMAIN_KEYWORDS = {
|
|
"AUTH": ["authentication", "login", "password", "credential", "mfa", "2fa",
|
|
"session", "token", "oauth", "identity", "authentifizierung", "anmeldung"],
|
|
"CRYPT": ["encryption", "cryptography", "tls", "ssl", "certificate", "hashing",
|
|
"aes", "rsa", "verschlüsselung", "kryptographie", "zertifikat"],
|
|
"NET": ["network", "firewall", "dns", "vpn", "proxy", "segmentation",
|
|
"netzwerk", "routing", "port", "intrusion"],
|
|
"DATA": ["data protection", "privacy", "personal data", "datenschutz",
|
|
"personenbezogen", "dsgvo", "gdpr", "löschung", "verarbeitung"],
|
|
"LOG": ["logging", "monitoring", "audit", "siem", "alert", "anomaly",
|
|
"protokollierung", "überwachung"],
|
|
"ACC": ["access control", "authorization", "rbac", "permission", "privilege",
|
|
"zugriffskontrolle", "berechtigung", "autorisierung"],
|
|
"SEC": ["vulnerability", "patch", "update", "hardening", "configuration",
|
|
"schwachstelle", "härtung", "konfiguration"],
|
|
"INC": ["incident", "response", "breach", "recovery", "backup",
|
|
"vorfall", "wiederherstellung", "notfall"],
|
|
"AI": ["artificial intelligence", "machine learning", "model", "bias",
|
|
"ki", "künstliche intelligenz", "algorithmus", "training"],
|
|
"COMP": ["compliance", "audit", "regulation", "standard", "certification",
|
|
"konformität", "prüfung", "zertifizierung"],
|
|
}
|
|
|
|
|
|
def _detect_domain(text: str) -> str:
|
|
"""Detect the most likely domain from text content."""
|
|
text_lower = text.lower()
|
|
scores: dict[str, int] = {}
|
|
for domain, keywords in DOMAIN_KEYWORDS.items():
|
|
scores[domain] = sum(1 for kw in keywords if kw in text_lower)
|
|
if not scores or max(scores.values()) == 0:
|
|
return "SEC" # Default
|
|
return max(scores, key=scores.get)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data Models
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class GeneratorConfig(BaseModel):
|
|
collections: Optional[List[str]] = None
|
|
domain: Optional[str] = None
|
|
batch_size: int = 5
|
|
max_controls: int = 50
|
|
skip_processed: bool = True
|
|
skip_web_search: bool = False
|
|
dry_run: bool = False
|
|
|
|
|
|
@dataclass
|
|
class GeneratedControl:
|
|
control_id: str = ""
|
|
title: str = ""
|
|
objective: str = ""
|
|
rationale: str = ""
|
|
scope: dict = field(default_factory=dict)
|
|
requirements: list = field(default_factory=list)
|
|
test_procedure: list = field(default_factory=list)
|
|
evidence: list = field(default_factory=list)
|
|
severity: str = "medium"
|
|
risk_score: float = 5.0
|
|
implementation_effort: str = "m"
|
|
open_anchors: list = field(default_factory=list)
|
|
release_state: str = "draft"
|
|
tags: list = field(default_factory=list)
|
|
# 3-rule fields
|
|
license_rule: Optional[int] = None
|
|
source_original_text: Optional[str] = None
|
|
source_citation: Optional[dict] = None
|
|
customer_visible: bool = True
|
|
generation_metadata: dict = field(default_factory=dict)
|
|
|
|
|
|
@dataclass
|
|
class GeneratorResult:
|
|
job_id: str = ""
|
|
status: str = "completed"
|
|
total_chunks_scanned: int = 0
|
|
controls_generated: int = 0
|
|
controls_verified: int = 0
|
|
controls_needs_review: int = 0
|
|
controls_too_close: int = 0
|
|
controls_duplicates_found: int = 0
|
|
errors: list = field(default_factory=list)
|
|
controls: list = field(default_factory=list)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# LLM Client (via Go SDK)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _llm_chat(prompt: str, system_prompt: Optional[str] = None) -> str:
|
|
"""Call LLM — Anthropic Claude (primary) or Ollama (fallback)."""
|
|
if ANTHROPIC_API_KEY:
|
|
result = await _llm_anthropic(prompt, system_prompt)
|
|
if result:
|
|
return result
|
|
logger.warning("Anthropic failed, falling back to Ollama")
|
|
|
|
return await _llm_ollama(prompt, system_prompt)
|
|
|
|
|
|
async def _llm_anthropic(prompt: str, system_prompt: Optional[str] = None) -> str:
|
|
"""Call Anthropic Messages API."""
|
|
headers = {
|
|
"x-api-key": ANTHROPIC_API_KEY,
|
|
"anthropic-version": "2023-06-01",
|
|
"content-type": "application/json",
|
|
}
|
|
payload = {
|
|
"model": ANTHROPIC_MODEL,
|
|
"max_tokens": 4096,
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
}
|
|
if system_prompt:
|
|
payload["system"] = system_prompt
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=LLM_TIMEOUT) as client:
|
|
resp = await client.post(
|
|
"https://api.anthropic.com/v1/messages",
|
|
headers=headers,
|
|
json=payload,
|
|
)
|
|
if resp.status_code != 200:
|
|
logger.error("Anthropic API %d: %s", resp.status_code, resp.text[:300])
|
|
return ""
|
|
data = resp.json()
|
|
content = data.get("content", [])
|
|
if content and isinstance(content, list):
|
|
return content[0].get("text", "")
|
|
return ""
|
|
except Exception as e:
|
|
logger.error("Anthropic request failed: %s", e)
|
|
return ""
|
|
|
|
|
|
async def _llm_ollama(prompt: str, system_prompt: Optional[str] = None) -> str:
|
|
"""Call Ollama chat API (fallback)."""
|
|
messages = []
|
|
if system_prompt:
|
|
messages.append({"role": "system", "content": system_prompt})
|
|
messages.append({"role": "user", "content": prompt})
|
|
|
|
payload = {
|
|
"model": OLLAMA_MODEL,
|
|
"messages": messages,
|
|
"stream": False,
|
|
}
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=LLM_TIMEOUT) as client:
|
|
resp = await client.post(f"{OLLAMA_URL}/api/chat", json=payload)
|
|
if resp.status_code != 200:
|
|
logger.error("Ollama chat failed %d: %s", resp.status_code, resp.text[:300])
|
|
return ""
|
|
data = resp.json()
|
|
msg = data.get("message", {})
|
|
if isinstance(msg, dict):
|
|
return msg.get("content", "")
|
|
return data.get("response", str(msg))
|
|
except Exception as e:
|
|
logger.error("Ollama request failed: %s", e)
|
|
return ""
|
|
|
|
|
|
async def _get_embedding(text: str) -> list[float]:
|
|
"""Get embedding vector for text via embedding service."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
resp = await client.post(
|
|
f"{EMBEDDING_URL}/embed",
|
|
json={"texts": [text]},
|
|
)
|
|
resp.raise_for_status()
|
|
embeddings = resp.json().get("embeddings", [])
|
|
return embeddings[0] if embeddings else []
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def _cosine_sim(a: list[float], b: list[float]) -> float:
|
|
"""Compute cosine similarity between two vectors."""
|
|
if not a or not b or len(a) != len(b):
|
|
return 0.0
|
|
dot = sum(x * y for x, y in zip(a, b))
|
|
norm_a = sum(x * x for x in a) ** 0.5
|
|
norm_b = sum(x * x for x in b) ** 0.5
|
|
if norm_a == 0 or norm_b == 0:
|
|
return 0.0
|
|
return dot / (norm_a * norm_b)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# JSON Parsing Helper
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _parse_llm_json(raw: str) -> dict:
|
|
"""Extract JSON from LLM response (handles markdown fences)."""
|
|
# Try extracting from ```json ... ``` blocks
|
|
match = re.search(r"```(?:json)?\s*\n?(.*?)\n?```", raw, re.DOTALL)
|
|
text = match.group(1) if match else raw
|
|
|
|
# Try parsing directly
|
|
try:
|
|
return json.loads(text)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Try finding first { ... } block
|
|
brace_match = re.search(r"\{.*\}", text, re.DOTALL)
|
|
if brace_match:
|
|
try:
|
|
return json.loads(brace_match.group(0))
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
logger.warning("Failed to parse LLM JSON response")
|
|
return {}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pipeline
|
|
# ---------------------------------------------------------------------------
|
|
|
|
REFORM_SYSTEM_PROMPT = """Du bist ein Security-Compliance-Experte. Deine Aufgabe ist es, eigenständige
|
|
Security Controls zu formulieren. Du formulierst IMMER in eigenen Worten.
|
|
KOPIERE KEINE Sätze aus dem Quelltext. Verwende eigene Begriffe und Struktur.
|
|
NENNE NICHT die Quelle. Keine proprietären Bezeichner.
|
|
Antworte NUR mit validem JSON."""
|
|
|
|
STRUCTURE_SYSTEM_PROMPT = """Du bist ein Security-Compliance-Experte. Strukturiere den gegebenen Text
|
|
als praxisorientiertes Security Control. Erstelle eine verständliche, umsetzbare Formulierung.
|
|
Antworte NUR mit validem JSON."""
|
|
|
|
|
|
class ControlGeneratorPipeline:
|
|
"""Orchestrates the 7-stage control generation pipeline."""
|
|
|
|
def __init__(self, db: Session, rag_client: Optional[ComplianceRAGClient] = None):
|
|
self.db = db
|
|
self.rag = rag_client or get_rag_client()
|
|
self._existing_controls: Optional[List[dict]] = None
|
|
self._existing_embeddings: Dict[str, List[float]] = {}
|
|
|
|
# ── Stage 1: RAG Scan ──────────────────────────────────────────────
|
|
|
|
async def _scan_rag(self, config: GeneratorConfig) -> list[RAGSearchResult]:
|
|
"""Load unprocessed chunks from RAG collections."""
|
|
collections = config.collections or ALL_COLLECTIONS
|
|
all_results: list[RAGSearchResult] = []
|
|
|
|
queries = [
|
|
"security requirement control measure",
|
|
"Sicherheitsanforderung Maßnahme Prüfaspekt",
|
|
"compliance requirement audit criterion",
|
|
"data protection privacy obligation",
|
|
"access control authentication authorization",
|
|
]
|
|
|
|
if config.domain:
|
|
domain_kw = DOMAIN_KEYWORDS.get(config.domain, [])
|
|
if domain_kw:
|
|
queries.append(" ".join(domain_kw[:5]))
|
|
|
|
for collection in collections:
|
|
for query in queries:
|
|
results = await self.rag.search(
|
|
query=query,
|
|
collection=collection,
|
|
top_k=20,
|
|
)
|
|
all_results.extend(results)
|
|
|
|
# Deduplicate by text hash
|
|
seen_hashes: set[str] = set()
|
|
unique: list[RAGSearchResult] = []
|
|
for r in all_results:
|
|
h = hashlib.sha256(r.text.encode()).hexdigest()
|
|
if h not in seen_hashes:
|
|
seen_hashes.add(h)
|
|
unique.append(r)
|
|
|
|
# Filter out already-processed chunks
|
|
if config.skip_processed and unique:
|
|
hashes = [hashlib.sha256(r.text.encode()).hexdigest() for r in unique]
|
|
processed = self._get_processed_hashes(hashes)
|
|
unique = [r for r, h in zip(unique, hashes) if h not in processed]
|
|
|
|
logger.info("RAG scan: %d unique chunks (%d after filtering processed)",
|
|
len(seen_hashes), len(unique))
|
|
return unique[:config.max_controls * 3] # Over-fetch to account for duplicates
|
|
|
|
def _get_processed_hashes(self, hashes: list[str]) -> set[str]:
|
|
"""Check which chunk hashes are already processed."""
|
|
if not hashes:
|
|
return set()
|
|
try:
|
|
result = self.db.execute(
|
|
text("SELECT chunk_hash FROM canonical_processed_chunks WHERE chunk_hash = ANY(:hashes)"),
|
|
{"hashes": hashes},
|
|
)
|
|
return {row[0] for row in result}
|
|
except Exception as e:
|
|
logger.warning("Error checking processed chunks: %s", e)
|
|
return set()
|
|
|
|
# ── Stage 2: License Classification ────────────────────────────────
|
|
|
|
def _classify_license(self, chunk: RAGSearchResult) -> dict:
|
|
"""Determine which license rule applies to this chunk."""
|
|
return _classify_regulation(chunk.regulation_code)
|
|
|
|
# ── Stage 3a: Structure (Rule 1 — Free Use) ───────────────────────
|
|
|
|
async def _structure_free_use(self, chunk: RAGSearchResult, license_info: dict) -> GeneratedControl:
|
|
"""Structure a freely usable text into control format."""
|
|
source_name = license_info.get("name", chunk.regulation_name)
|
|
prompt = f"""Strukturiere den folgenden Gesetzestext als Security/Compliance Control.
|
|
Du DARFST den Originaltext verwenden (Quelle: {source_name}, {license_info.get('license', '')}).
|
|
|
|
WICHTIG: Erstelle eine verständliche, praxisorientierte Formulierung.
|
|
Der Originaltext wird separat gespeichert — deine Formulierung soll klar und umsetzbar sein.
|
|
|
|
Gib JSON zurück mit diesen Feldern:
|
|
- title: Kurzer prägnanter Titel (max 100 Zeichen)
|
|
- objective: Was soll erreicht werden? (1-3 Sätze)
|
|
- rationale: Warum ist das wichtig? (1-2 Sätze)
|
|
- requirements: Liste von konkreten Anforderungen (Strings)
|
|
- test_procedure: Liste von Prüfschritten (Strings)
|
|
- evidence: Liste von Nachweisdokumenten (Strings)
|
|
- severity: low/medium/high/critical
|
|
- tags: Liste von Tags
|
|
|
|
Text: {chunk.text[:2000]}
|
|
Quelle: {chunk.regulation_name} ({chunk.regulation_code}), {chunk.article}"""
|
|
|
|
raw = await _llm_chat(prompt, STRUCTURE_SYSTEM_PROMPT)
|
|
data = _parse_llm_json(raw)
|
|
if not data:
|
|
return self._fallback_control(chunk)
|
|
|
|
domain = _detect_domain(chunk.text)
|
|
control = self._build_control_from_json(data, domain)
|
|
control.license_rule = 1
|
|
control.source_original_text = chunk.text
|
|
control.source_citation = {
|
|
"source": f"{chunk.regulation_name} {chunk.article or ''}".strip(),
|
|
"license": license_info.get("license", ""),
|
|
"url": chunk.source_url or "",
|
|
}
|
|
control.customer_visible = True
|
|
control.generation_metadata = {
|
|
"processing_path": "structured",
|
|
"license_rule": 1,
|
|
"source_regulation": chunk.regulation_code,
|
|
"source_article": chunk.article,
|
|
}
|
|
return control
|
|
|
|
# ── Stage 3b: Structure with Citation (Rule 2) ────────────────────
|
|
|
|
async def _structure_with_citation(self, chunk: RAGSearchResult, license_info: dict) -> GeneratedControl:
|
|
"""Structure text that requires citation."""
|
|
source_name = license_info.get("name", chunk.regulation_name)
|
|
attribution = license_info.get("attribution", "")
|
|
prompt = f"""Strukturiere den folgenden Text als Security Control.
|
|
Quelle: {source_name} ({license_info.get('license', '')}) — Zitation erforderlich.
|
|
|
|
Du darfst den Text übernehmen oder verständlicher umformulieren.
|
|
Die Quelle wird automatisch zitiert — fokussiere dich auf Klarheit.
|
|
|
|
Gib JSON zurück mit diesen Feldern:
|
|
- title: Kurzer prägnanter Titel (max 100 Zeichen)
|
|
- objective: Was soll erreicht werden? (1-3 Sätze)
|
|
- rationale: Warum ist das wichtig? (1-2 Sätze)
|
|
- requirements: Liste von konkreten Anforderungen (Strings)
|
|
- test_procedure: Liste von Prüfschritten (Strings)
|
|
- evidence: Liste von Nachweisdokumenten (Strings)
|
|
- severity: low/medium/high/critical
|
|
- tags: Liste von Tags
|
|
|
|
Text: {chunk.text[:2000]}
|
|
Quelle: {chunk.regulation_name}, {chunk.article}"""
|
|
|
|
raw = await _llm_chat(prompt, STRUCTURE_SYSTEM_PROMPT)
|
|
data = _parse_llm_json(raw)
|
|
if not data:
|
|
return self._fallback_control(chunk)
|
|
|
|
domain = _detect_domain(chunk.text)
|
|
control = self._build_control_from_json(data, domain)
|
|
control.license_rule = 2
|
|
control.source_original_text = chunk.text
|
|
control.source_citation = {
|
|
"source": f"{chunk.regulation_name} {chunk.article or ''}".strip(),
|
|
"license": license_info.get("license", ""),
|
|
"license_notice": attribution,
|
|
"url": chunk.source_url or "",
|
|
}
|
|
control.customer_visible = True
|
|
control.generation_metadata = {
|
|
"processing_path": "structured",
|
|
"license_rule": 2,
|
|
"source_regulation": chunk.regulation_code,
|
|
"source_article": chunk.article,
|
|
}
|
|
return control
|
|
|
|
# ── Stage 3c: LLM Reformulation (Rule 3 — Restricted) ─────────────
|
|
|
|
async def _llm_reformulate(self, chunk: RAGSearchResult, config: GeneratorConfig) -> GeneratedControl:
|
|
"""Fully reformulate — NO original text, NO source names."""
|
|
domain = config.domain or _detect_domain(chunk.text)
|
|
prompt = f"""Analysiere den folgenden Prüfaspekt und formuliere ein EIGENSTÄNDIGES Security Control.
|
|
KOPIERE KEINE Sätze. Verwende eigene Begriffe und Struktur.
|
|
NENNE NICHT die Quelle. Keine proprietären Bezeichner (kein O.Auth_*, TR-03161, BSI-TR etc.).
|
|
|
|
Aspekt (nur zur Analyse, NICHT kopieren, NICHT referenzieren):
|
|
---
|
|
{chunk.text[:1500]}
|
|
---
|
|
|
|
Domain: {domain}
|
|
|
|
Gib JSON zurück mit diesen Feldern:
|
|
- title: Kurzer eigenständiger Titel (max 100 Zeichen)
|
|
- objective: Eigenständige Formulierung des Ziels (1-3 Sätze)
|
|
- rationale: Eigenständige Begründung (1-2 Sätze)
|
|
- requirements: Liste von konkreten Anforderungen (Strings, eigene Worte)
|
|
- test_procedure: Liste von Prüfschritten (Strings)
|
|
- evidence: Liste von Nachweisdokumenten (Strings)
|
|
- severity: low/medium/high/critical
|
|
- tags: Liste von Tags (eigene Begriffe)"""
|
|
|
|
raw = await _llm_chat(prompt, REFORM_SYSTEM_PROMPT)
|
|
data = _parse_llm_json(raw)
|
|
if not data:
|
|
return self._fallback_control(chunk)
|
|
|
|
control = self._build_control_from_json(data, domain)
|
|
control.license_rule = 3
|
|
control.source_original_text = None # NEVER store original
|
|
control.source_citation = None # NEVER cite source
|
|
control.customer_visible = False # Only our formulation
|
|
# generation_metadata: NO source names, NO original texts
|
|
control.generation_metadata = {
|
|
"processing_path": "llm_reform",
|
|
"license_rule": 3,
|
|
}
|
|
return control
|
|
|
|
# ── Stage 4: Harmonization ─────────────────────────────────────────
|
|
|
|
async def _check_harmonization(self, new_control: GeneratedControl) -> Optional[list]:
|
|
"""Check if a new control duplicates existing ones via embedding similarity."""
|
|
existing = self._load_existing_controls()
|
|
if not existing:
|
|
return None
|
|
|
|
new_text = f"{new_control.title} {new_control.objective}"
|
|
new_emb = await _get_embedding(new_text)
|
|
if not new_emb:
|
|
return None
|
|
|
|
similar = []
|
|
for ex in existing:
|
|
ex_key = ex.get("control_id", "")
|
|
ex_text = f"{ex.get('title', '')} {ex.get('objective', '')}"
|
|
|
|
# Get or compute embedding for existing control
|
|
if ex_key not in self._existing_embeddings:
|
|
emb = await _get_embedding(ex_text)
|
|
self._existing_embeddings[ex_key] = emb
|
|
ex_emb = self._existing_embeddings.get(ex_key, [])
|
|
|
|
if not ex_emb:
|
|
continue
|
|
|
|
cosine = _cosine_sim(new_emb, ex_emb)
|
|
if cosine > HARMONIZATION_THRESHOLD:
|
|
similar.append({
|
|
"control_id": ex.get("control_id", ""),
|
|
"title": ex.get("title", ""),
|
|
"similarity": round(cosine, 3),
|
|
})
|
|
|
|
return similar if similar else None
|
|
|
|
def _load_existing_controls(self) -> list[dict]:
|
|
"""Load existing controls from DB (cached per pipeline run)."""
|
|
if self._existing_controls is not None:
|
|
return self._existing_controls
|
|
|
|
try:
|
|
result = self.db.execute(
|
|
text("SELECT control_id, title, objective FROM canonical_controls WHERE release_state != 'deprecated'")
|
|
)
|
|
self._existing_controls = [
|
|
{"control_id": r[0], "title": r[1], "objective": r[2]}
|
|
for r in result
|
|
]
|
|
except Exception as e:
|
|
logger.warning("Error loading existing controls: %s", e)
|
|
self._existing_controls = []
|
|
|
|
return self._existing_controls
|
|
|
|
# ── Helpers ────────────────────────────────────────────────────────
|
|
|
|
def _build_control_from_json(self, data: dict, domain: str) -> GeneratedControl:
|
|
"""Build a GeneratedControl from parsed LLM JSON."""
|
|
severity = data.get("severity", "medium")
|
|
if severity not in ("low", "medium", "high", "critical"):
|
|
severity = "medium"
|
|
|
|
tags = data.get("tags", [])
|
|
if isinstance(tags, str):
|
|
tags = [t.strip() for t in tags.split(",")]
|
|
|
|
return GeneratedControl(
|
|
title=str(data.get("title", "Untitled Control"))[:255],
|
|
objective=str(data.get("objective", "")),
|
|
rationale=str(data.get("rationale", "")),
|
|
scope=data.get("scope", {}),
|
|
requirements=data.get("requirements", []) if isinstance(data.get("requirements"), list) else [],
|
|
test_procedure=data.get("test_procedure", []) if isinstance(data.get("test_procedure"), list) else [],
|
|
evidence=data.get("evidence", []) if isinstance(data.get("evidence"), list) else [],
|
|
severity=severity,
|
|
risk_score=min(10.0, max(0.0, float(data.get("risk_score", 5.0)))),
|
|
implementation_effort=data.get("implementation_effort", "m") if data.get("implementation_effort") in ("s", "m", "l", "xl") else "m",
|
|
tags=tags[:20],
|
|
)
|
|
|
|
def _fallback_control(self, chunk: RAGSearchResult) -> GeneratedControl:
|
|
"""Create a minimal control when LLM parsing fails."""
|
|
domain = _detect_domain(chunk.text)
|
|
return GeneratedControl(
|
|
title=f"Control from {chunk.regulation_code} {chunk.article or ''}".strip()[:255],
|
|
objective=chunk.text[:500] if chunk.text else "Needs manual review",
|
|
rationale="Auto-generated — LLM parsing failed, manual review required.",
|
|
severity="medium",
|
|
release_state="needs_review",
|
|
tags=[domain.lower()],
|
|
)
|
|
|
|
def _generate_control_id(self, domain: str, db: Session) -> str:
|
|
"""Generate next sequential control ID like AUTH-011."""
|
|
prefix = domain.upper()[:4]
|
|
try:
|
|
result = db.execute(
|
|
text("SELECT control_id FROM canonical_controls WHERE control_id LIKE :prefix ORDER BY control_id DESC LIMIT 1"),
|
|
{"prefix": f"{prefix}-%"},
|
|
)
|
|
row = result.fetchone()
|
|
if row:
|
|
last_num = int(row[0].split("-")[-1])
|
|
return f"{prefix}-{last_num + 1:03d}"
|
|
except Exception:
|
|
pass
|
|
return f"{prefix}-001"
|
|
|
|
# ── Pipeline Orchestration ─────────────────────────────────────────
|
|
|
|
def _create_job(self, config: GeneratorConfig) -> str:
|
|
"""Create a generation job record."""
|
|
try:
|
|
result = self.db.execute(
|
|
text("""
|
|
INSERT INTO canonical_generation_jobs (status, config)
|
|
VALUES ('running', :config)
|
|
RETURNING id
|
|
"""),
|
|
{"config": json.dumps(config.model_dump())},
|
|
)
|
|
self.db.commit()
|
|
row = result.fetchone()
|
|
return str(row[0]) if row else str(uuid.uuid4())
|
|
except Exception as e:
|
|
logger.error("Failed to create job: %s", e)
|
|
return str(uuid.uuid4())
|
|
|
|
def _update_job(self, job_id: str, result: GeneratorResult):
|
|
"""Update job with final stats."""
|
|
try:
|
|
self.db.execute(
|
|
text("""
|
|
UPDATE canonical_generation_jobs
|
|
SET status = :status,
|
|
total_chunks_scanned = :scanned,
|
|
controls_generated = :generated,
|
|
controls_verified = :verified,
|
|
controls_needs_review = :needs_review,
|
|
controls_too_close = :too_close,
|
|
controls_duplicates_found = :duplicates,
|
|
errors = :errors,
|
|
completed_at = NOW()
|
|
WHERE id = CAST(:job_id AS uuid)
|
|
"""),
|
|
{
|
|
"job_id": job_id,
|
|
"status": result.status,
|
|
"scanned": result.total_chunks_scanned,
|
|
"generated": result.controls_generated,
|
|
"verified": result.controls_verified,
|
|
"needs_review": result.controls_needs_review,
|
|
"too_close": result.controls_too_close,
|
|
"duplicates": result.controls_duplicates_found,
|
|
"errors": json.dumps(result.errors[-50:]),
|
|
},
|
|
)
|
|
self.db.commit()
|
|
except Exception as e:
|
|
logger.error("Failed to update job: %s", e)
|
|
|
|
def _store_control(self, control: GeneratedControl, job_id: str) -> Optional[str]:
|
|
"""Persist a generated control to DB. Returns the control UUID or None."""
|
|
try:
|
|
# Get framework UUID
|
|
fw_result = self.db.execute(
|
|
text("SELECT id FROM canonical_control_frameworks WHERE framework_id = 'bp_security_v1' LIMIT 1")
|
|
)
|
|
fw_row = fw_result.fetchone()
|
|
if not fw_row:
|
|
logger.error("Framework bp_security_v1 not found")
|
|
return None
|
|
framework_uuid = fw_row[0]
|
|
|
|
# Generate control_id if not set
|
|
if not control.control_id:
|
|
domain = _detect_domain(control.objective) if control.objective else "SEC"
|
|
control.control_id = self._generate_control_id(domain, self.db)
|
|
|
|
result = self.db.execute(
|
|
text("""
|
|
INSERT INTO canonical_controls (
|
|
framework_id, control_id, title, objective, rationale,
|
|
scope, requirements, test_procedure, evidence,
|
|
severity, risk_score, implementation_effort,
|
|
open_anchors, release_state, tags,
|
|
license_rule, source_original_text, source_citation,
|
|
customer_visible, generation_metadata
|
|
) VALUES (
|
|
:framework_id, :control_id, :title, :objective, :rationale,
|
|
:scope, :requirements, :test_procedure, :evidence,
|
|
:severity, :risk_score, :implementation_effort,
|
|
:open_anchors, :release_state, :tags,
|
|
:license_rule, :source_original_text, :source_citation,
|
|
:customer_visible, :generation_metadata
|
|
)
|
|
ON CONFLICT (framework_id, control_id) DO NOTHING
|
|
RETURNING id
|
|
"""),
|
|
{
|
|
"framework_id": framework_uuid,
|
|
"control_id": control.control_id,
|
|
"title": control.title,
|
|
"objective": control.objective,
|
|
"rationale": control.rationale,
|
|
"scope": json.dumps(control.scope),
|
|
"requirements": json.dumps(control.requirements),
|
|
"test_procedure": json.dumps(control.test_procedure),
|
|
"evidence": json.dumps(control.evidence),
|
|
"severity": control.severity,
|
|
"risk_score": control.risk_score,
|
|
"implementation_effort": control.implementation_effort,
|
|
"open_anchors": json.dumps(control.open_anchors),
|
|
"release_state": control.release_state,
|
|
"tags": json.dumps(control.tags),
|
|
"license_rule": control.license_rule,
|
|
"source_original_text": control.source_original_text,
|
|
"source_citation": json.dumps(control.source_citation) if control.source_citation else None,
|
|
"customer_visible": control.customer_visible,
|
|
"generation_metadata": json.dumps(control.generation_metadata) if control.generation_metadata else None,
|
|
},
|
|
)
|
|
self.db.commit()
|
|
row = result.fetchone()
|
|
return str(row[0]) if row else None
|
|
except Exception as e:
|
|
logger.error("Failed to store control %s: %s", control.control_id, e)
|
|
self.db.rollback()
|
|
return None
|
|
|
|
def _mark_chunk_processed(
|
|
self,
|
|
chunk: RAGSearchResult,
|
|
license_info: dict,
|
|
processing_path: str,
|
|
control_ids: list[str],
|
|
job_id: str,
|
|
):
|
|
"""Mark a RAG chunk as processed (Stage 7)."""
|
|
chunk_hash = hashlib.sha256(chunk.text.encode()).hexdigest()
|
|
try:
|
|
self.db.execute(
|
|
text("""
|
|
INSERT INTO canonical_processed_chunks (
|
|
chunk_hash, collection, regulation_code,
|
|
document_version, source_license, license_rule,
|
|
processing_path, generated_control_ids, job_id
|
|
) VALUES (
|
|
:hash, :collection, :regulation_code,
|
|
:doc_version, :license, :rule,
|
|
:path, :control_ids, CAST(:job_id AS uuid)
|
|
)
|
|
ON CONFLICT (chunk_hash, collection, document_version) DO NOTHING
|
|
"""),
|
|
{
|
|
"hash": chunk_hash,
|
|
"collection": "bp_compliance_ce", # Default, we don't track collection per result
|
|
"regulation_code": chunk.regulation_code,
|
|
"doc_version": "1.0",
|
|
"license": license_info.get("license", ""),
|
|
"rule": license_info.get("rule", 3),
|
|
"path": processing_path,
|
|
"control_ids": json.dumps(control_ids),
|
|
"job_id": job_id,
|
|
},
|
|
)
|
|
self.db.commit()
|
|
except Exception as e:
|
|
logger.warning("Failed to mark chunk processed: %s", e)
|
|
|
|
# ── Main Pipeline ──────────────────────────────────────────────────
|
|
|
|
async def run(self, config: GeneratorConfig) -> GeneratorResult:
|
|
"""Execute the full 7-stage pipeline."""
|
|
result = GeneratorResult()
|
|
|
|
# Create job
|
|
job_id = self._create_job(config)
|
|
result.job_id = job_id
|
|
|
|
try:
|
|
# Stage 1: RAG Scan
|
|
chunks = await self._scan_rag(config)
|
|
result.total_chunks_scanned = len(chunks)
|
|
|
|
if not chunks:
|
|
result.status = "completed"
|
|
self._update_job(job_id, result)
|
|
return result
|
|
|
|
# Process chunks
|
|
controls_count = 0
|
|
for chunk in chunks:
|
|
if controls_count >= config.max_controls:
|
|
break
|
|
|
|
try:
|
|
control = await self._process_single_chunk(chunk, config, job_id)
|
|
if control is None:
|
|
continue
|
|
|
|
# Count by state
|
|
if control.release_state == "too_close":
|
|
result.controls_too_close += 1
|
|
elif control.release_state == "duplicate":
|
|
result.controls_duplicates_found += 1
|
|
elif control.release_state == "needs_review":
|
|
result.controls_needs_review += 1
|
|
else:
|
|
result.controls_verified += 1
|
|
|
|
# Store (unless dry run)
|
|
if not config.dry_run:
|
|
ctrl_uuid = self._store_control(control, job_id)
|
|
if ctrl_uuid:
|
|
# Stage 7: Mark chunk processed
|
|
license_info = self._classify_license(chunk)
|
|
path = "llm_reform" if license_info["rule"] == 3 else "structured"
|
|
self._mark_chunk_processed(chunk, license_info, path, [ctrl_uuid], job_id)
|
|
|
|
result.controls_generated += 1
|
|
result.controls.append(asdict(control))
|
|
controls_count += 1
|
|
|
|
# Add to existing controls for harmonization of next chunks
|
|
if self._existing_controls is not None:
|
|
self._existing_controls.append({
|
|
"control_id": control.control_id,
|
|
"title": control.title,
|
|
"objective": control.objective,
|
|
})
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error processing chunk {chunk.regulation_code}/{chunk.article}: {e}"
|
|
logger.error(error_msg)
|
|
result.errors.append(error_msg)
|
|
|
|
result.status = "completed"
|
|
|
|
except Exception as e:
|
|
result.status = "failed"
|
|
result.errors.append(str(e))
|
|
logger.error("Pipeline failed: %s", e)
|
|
|
|
self._update_job(job_id, result)
|
|
return result
|
|
|
|
async def _process_single_chunk(
|
|
self,
|
|
chunk: RAGSearchResult,
|
|
config: GeneratorConfig,
|
|
job_id: str,
|
|
) -> Optional[GeneratedControl]:
|
|
"""Process a single chunk through stages 2-5."""
|
|
# Stage 2: License classification
|
|
license_info = self._classify_license(chunk)
|
|
|
|
# Stage 3: Structure or Reform based on rule
|
|
if license_info["rule"] == 1:
|
|
control = await self._structure_free_use(chunk, license_info)
|
|
elif license_info["rule"] == 2:
|
|
control = await self._structure_with_citation(chunk, license_info)
|
|
else:
|
|
control = await self._llm_reformulate(chunk, config)
|
|
|
|
# Too-Close-Check for Rule 3
|
|
similarity = await check_similarity(chunk.text, f"{control.objective} {control.rationale}")
|
|
if similarity.status == "FAIL":
|
|
control.release_state = "too_close"
|
|
control.generation_metadata["similarity_status"] = "FAIL"
|
|
control.generation_metadata["similarity_scores"] = {
|
|
"token_overlap": similarity.token_overlap,
|
|
"ngram_jaccard": similarity.ngram_jaccard,
|
|
"lcs_ratio": similarity.lcs_ratio,
|
|
}
|
|
return control
|
|
|
|
if not control.title or not control.objective:
|
|
return None
|
|
|
|
# Stage 4: Harmonization
|
|
duplicates = await self._check_harmonization(control)
|
|
if duplicates:
|
|
control.release_state = "duplicate"
|
|
control.generation_metadata["similar_controls"] = duplicates
|
|
return control
|
|
|
|
# Stage 5: Anchor Search (imported from anchor_finder)
|
|
try:
|
|
from .anchor_finder import AnchorFinder
|
|
finder = AnchorFinder(self.rag)
|
|
anchors = await finder.find_anchors(control, skip_web=config.skip_web_search)
|
|
control.open_anchors = [asdict(a) if hasattr(a, '__dataclass_fields__') else a for a in anchors]
|
|
except Exception as e:
|
|
logger.warning("Anchor search failed: %s", e)
|
|
|
|
# Determine release state
|
|
if control.license_rule in (1, 2):
|
|
control.release_state = "draft"
|
|
elif control.open_anchors:
|
|
control.release_state = "draft"
|
|
else:
|
|
control.release_state = "needs_review"
|
|
|
|
# Generate control_id
|
|
domain = config.domain or _detect_domain(control.objective)
|
|
control.control_id = self._generate_control_id(domain, self.db)
|
|
|
|
# Store job_id in metadata
|
|
control.generation_metadata["job_id"] = job_id
|
|
|
|
return control
|