"""Control Composer — Pattern + Obligation → Master Control. Takes an obligation (from ObligationExtractor) and a matched control pattern (from PatternMatcher), then uses LLM to compose a structured, actionable Master Control. Replaces the old Stage 3 (STRUCTURE/REFORM) with a pattern-guided approach. Three composition modes based on license rules: Rule 1: Obligation + Pattern + original text → full control Rule 2: Obligation + Pattern + original text + citation → control Rule 3: Obligation + Pattern (NO original text) → reformulated control Fallback: No pattern match → basic generation (tagged needs_pattern_assignment) Part of the Multi-Layer Control Architecture (Phase 6 of 8). """ import json import logging import os from dataclasses import dataclass, field from typing import Optional from services.obligation_extractor import ( ObligationMatch, _llm_ollama, _parse_json, ) from services.pattern_matcher import ( ControlPattern, PatternMatchResult, ) logger = logging.getLogger(__name__) OLLAMA_MODEL = os.getenv("CONTROL_GEN_OLLAMA_MODEL", "qwen3.5:35b-a3b") # Valid values for generated control fields VALID_SEVERITIES = {"low", "medium", "high", "critical"} VALID_EFFORTS = {"s", "m", "l", "xl"} VALID_VERIFICATION = {"code_review", "document", "tool", "hybrid"} @dataclass class ComposedControl: """A Master Control composed from an obligation + pattern.""" # Core fields (match canonical_controls schema) control_id: str = "" title: str = "" objective: str = "" rationale: str = "" scope: dict = field(default_factory=dict) requirements: list = field(default_factory=list) test_procedure: list = field(default_factory=list) evidence: list = field(default_factory=list) severity: str = "medium" risk_score: float = 5.0 implementation_effort: str = "m" open_anchors: list = field(default_factory=list) release_state: str = "draft" tags: list = field(default_factory=list) # 3-Rule License fields license_rule: Optional[int] = None source_original_text: Optional[str] = None source_citation: Optional[dict] = None customer_visible: bool = True # Classification verification_method: Optional[str] = None category: Optional[str] = None target_audience: Optional[list] = None # Pattern + Obligation linkage pattern_id: Optional[str] = None obligation_ids: list = field(default_factory=list) # Metadata generation_metadata: dict = field(default_factory=dict) composition_method: str = "pattern_guided" # pattern_guided | fallback def to_dict(self) -> dict: """Serialize for DB storage or API response.""" return { "control_id": self.control_id, "title": self.title, "objective": self.objective, "rationale": self.rationale, "scope": self.scope, "requirements": self.requirements, "test_procedure": self.test_procedure, "evidence": self.evidence, "severity": self.severity, "risk_score": self.risk_score, "implementation_effort": self.implementation_effort, "open_anchors": self.open_anchors, "release_state": self.release_state, "tags": self.tags, "license_rule": self.license_rule, "source_original_text": self.source_original_text, "source_citation": self.source_citation, "customer_visible": self.customer_visible, "verification_method": self.verification_method, "category": self.category, "target_audience": self.target_audience, "pattern_id": self.pattern_id, "obligation_ids": self.obligation_ids, "generation_metadata": self.generation_metadata, "composition_method": self.composition_method, } class ControlComposer: """Composes Master Controls from obligations + patterns. Usage:: composer = ControlComposer() control = await composer.compose( obligation=obligation_match, pattern_result=pattern_match_result, chunk_text="...", license_rule=1, source_citation={...}, ) """ async def compose( self, obligation: ObligationMatch, pattern_result: PatternMatchResult, chunk_text: Optional[str] = None, license_rule: int = 3, source_citation: Optional[dict] = None, regulation_code: Optional[str] = None, ) -> ComposedControl: """Compose a Master Control from obligation + pattern. Args: obligation: The extracted obligation (from ObligationExtractor). pattern_result: The matched pattern (from PatternMatcher). chunk_text: Original RAG chunk text (only used for Rules 1-2). license_rule: 1=free, 2=citation, 3=restricted. source_citation: Citation metadata for Rule 2. regulation_code: Source regulation code. Returns: ComposedControl ready for storage. """ pattern = pattern_result.pattern if pattern_result else None if pattern: control = await self._compose_with_pattern( obligation, pattern, chunk_text, license_rule, source_citation, ) else: control = await self._compose_fallback( obligation, chunk_text, license_rule, source_citation, ) # Set linkage fields control.pattern_id = pattern.id if pattern else None if obligation.obligation_id: control.obligation_ids = [obligation.obligation_id] # Set license fields control.license_rule = license_rule if license_rule in (1, 2) and chunk_text: control.source_original_text = chunk_text if license_rule == 2 and source_citation: control.source_citation = source_citation if license_rule == 3: control.customer_visible = False control.source_original_text = None control.source_citation = None # Build metadata control.generation_metadata = { "composition_method": control.composition_method, "pattern_id": control.pattern_id, "pattern_confidence": round(pattern_result.confidence, 3) if pattern_result else 0, "pattern_method": pattern_result.method if pattern_result else "none", "obligation_id": obligation.obligation_id, "obligation_method": obligation.method, "obligation_confidence": round(obligation.confidence, 3), "license_rule": license_rule, "regulation_code": regulation_code, } # Validate and fix fields _validate_control(control) return control async def compose_batch( self, items: list[dict], ) -> list[ComposedControl]: """Compose multiple controls. Args: items: List of dicts with keys: obligation, pattern_result, chunk_text, license_rule, source_citation, regulation_code. Returns: List of ComposedControl instances. """ results = [] for item in items: control = await self.compose( obligation=item["obligation"], pattern_result=item.get("pattern_result", PatternMatchResult()), chunk_text=item.get("chunk_text"), license_rule=item.get("license_rule", 3), source_citation=item.get("source_citation"), regulation_code=item.get("regulation_code"), ) results.append(control) return results # ----------------------------------------------------------------------- # Pattern-guided composition # ----------------------------------------------------------------------- async def _compose_with_pattern( self, obligation: ObligationMatch, pattern: ControlPattern, chunk_text: Optional[str], license_rule: int, source_citation: Optional[dict], ) -> ComposedControl: """Use LLM to fill the pattern template with obligation-specific details.""" prompt = _build_compose_prompt(obligation, pattern, chunk_text, license_rule) system_prompt = _compose_system_prompt(license_rule) llm_result = await _llm_ollama(prompt, system_prompt) if not llm_result: return self._compose_from_template(obligation, pattern) parsed = _parse_json(llm_result) if not parsed: return self._compose_from_template(obligation, pattern) control = ComposedControl( title=parsed.get("title", pattern.name_de)[:255], objective=parsed.get("objective", pattern.objective_template), rationale=parsed.get("rationale", pattern.rationale_template), requirements=_ensure_list(parsed.get("requirements", pattern.requirements_template)), test_procedure=_ensure_list(parsed.get("test_procedure", pattern.test_procedure_template)), evidence=_ensure_list(parsed.get("evidence", pattern.evidence_template)), severity=parsed.get("severity", pattern.severity_default), implementation_effort=parsed.get("implementation_effort", pattern.implementation_effort_default), category=parsed.get("category", pattern.category), tags=_ensure_list(parsed.get("tags", pattern.tags)), target_audience=_ensure_list(parsed.get("target_audience", [])), verification_method=parsed.get("verification_method"), open_anchors=_anchors_from_pattern(pattern), composition_method="pattern_guided", ) return control def _compose_from_template( self, obligation: ObligationMatch, pattern: ControlPattern, ) -> ComposedControl: """Fallback: fill template directly without LLM (when LLM fails).""" obl_title = obligation.obligation_title or "" obl_text = obligation.obligation_text or "" title = f"{pattern.name_de}" if obl_title: title = f"{pattern.name_de} — {obl_title}" objective = pattern.objective_template if obl_text and len(obl_text) > 20: objective = f"{pattern.objective_template} Bezug: {obl_text[:200]}" return ComposedControl( title=title[:255], objective=objective, rationale=pattern.rationale_template, requirements=list(pattern.requirements_template), test_procedure=list(pattern.test_procedure_template), evidence=list(pattern.evidence_template), severity=pattern.severity_default, implementation_effort=pattern.implementation_effort_default, category=pattern.category, tags=list(pattern.tags), open_anchors=_anchors_from_pattern(pattern), composition_method="template_only", ) # ----------------------------------------------------------------------- # Fallback (no pattern) # ----------------------------------------------------------------------- async def _compose_fallback( self, obligation: ObligationMatch, chunk_text: Optional[str], license_rule: int, source_citation: Optional[dict], ) -> ComposedControl: """Generate a control without a pattern template (old-style).""" prompt = _build_fallback_prompt(obligation, chunk_text, license_rule) system_prompt = _compose_system_prompt(license_rule) llm_result = await _llm_ollama(prompt, system_prompt) parsed = _parse_json(llm_result) if llm_result else {} obl_text = obligation.obligation_text or "" control = ComposedControl( title=parsed.get("title", obl_text[:100] if obl_text else "Untitled Control")[:255], objective=parsed.get("objective", obl_text[:500]), rationale=parsed.get("rationale", "Aus gesetzlicher Pflicht abgeleitet."), requirements=_ensure_list(parsed.get("requirements", [])), test_procedure=_ensure_list(parsed.get("test_procedure", [])), evidence=_ensure_list(parsed.get("evidence", [])), severity=parsed.get("severity", "medium"), implementation_effort=parsed.get("implementation_effort", "m"), category=parsed.get("category"), tags=_ensure_list(parsed.get("tags", [])), target_audience=_ensure_list(parsed.get("target_audience", [])), verification_method=parsed.get("verification_method"), composition_method="fallback", release_state="needs_review", ) return control # --------------------------------------------------------------------------- # Prompt builders # --------------------------------------------------------------------------- def _compose_system_prompt(license_rule: int) -> str: """Build the system prompt based on license rule.""" if license_rule == 3: return ( "Du bist ein Security-Compliance-Experte. Deine Aufgabe ist es, " "eigenstaendige Security Controls zu formulieren. " "Du formulierst IMMER in eigenen Worten. " "KOPIERE KEINE Saetze aus dem Quelltext. " "Verwende eigene Begriffe und Struktur. " "NENNE NICHT die Quelle. Keine proprietaeren Bezeichner. " "Antworte NUR mit validem JSON." ) return ( "Du bist ein Security-Compliance-Experte. " "Erstelle ein praxisorientiertes, umsetzbares Security Control. " "Antworte NUR mit validem JSON." ) def _build_compose_prompt( obligation: ObligationMatch, pattern: ControlPattern, chunk_text: Optional[str], license_rule: int, ) -> str: """Build the LLM prompt for pattern-guided composition.""" obl_section = _obligation_section(obligation) pattern_section = _pattern_section(pattern) if license_rule == 3: context_section = "KONTEXT: Intern analysiert (keine Quellenangabe)." elif chunk_text: context_section = f"KONTEXT (Originaltext):\n{chunk_text[:2000]}" else: context_section = "KONTEXT: Kein Originaltext verfuegbar." return f"""Erstelle ein PRAXISORIENTIERTES Security Control. {obl_section} {pattern_section} {context_section} AUFGABE: Fuelle das Muster mit pflicht-spezifischen Details. Das Ergebnis muss UMSETZBAR sein — keine Gesetzesparaphrase. Formuliere konkret und handlungsorientiert. Antworte als JSON: {{ "title": "Kurzer praegnanter Titel (max 100 Zeichen, deutsch)", "objective": "Was soll erreicht werden? (1-3 Saetze)", "rationale": "Warum ist das wichtig? (1-2 Saetze)", "requirements": ["Konkrete Anforderung 1", "Anforderung 2", ...], "test_procedure": ["Pruefschritt 1", "Pruefschritt 2", ...], "evidence": ["Nachweis 1", "Nachweis 2", ...], "severity": "low|medium|high|critical", "implementation_effort": "s|m|l|xl", "category": "{pattern.category}", "tags": ["tag1", "tag2"], "target_audience": ["unternehmen", "behoerden", "entwickler"], "verification_method": "code_review|document|tool|hybrid" }}""" def _build_fallback_prompt( obligation: ObligationMatch, chunk_text: Optional[str], license_rule: int, ) -> str: """Build the LLM prompt for fallback composition (no pattern).""" obl_section = _obligation_section(obligation) if license_rule == 3: context_section = "KONTEXT: Intern analysiert (keine Quellenangabe)." elif chunk_text: context_section = f"KONTEXT (Originaltext):\n{chunk_text[:2000]}" else: context_section = "KONTEXT: Kein Originaltext verfuegbar." return f"""Erstelle ein Security Control aus der folgenden Pflicht. {obl_section} {context_section} AUFGABE: Formuliere ein umsetzbares Security Control. Keine Gesetzesparaphrase — konkrete Massnahmen beschreiben. Antworte als JSON: {{ "title": "Kurzer praegnanter Titel (max 100 Zeichen, deutsch)", "objective": "Was soll erreicht werden? (1-3 Saetze)", "rationale": "Warum ist das wichtig? (1-2 Saetze)", "requirements": ["Konkrete Anforderung 1", "Anforderung 2", ...], "test_procedure": ["Pruefschritt 1", "Pruefschritt 2", ...], "evidence": ["Nachweis 1", "Nachweis 2", ...], "severity": "low|medium|high|critical", "implementation_effort": "s|m|l|xl", "category": "one of: authentication, encryption, data_protection, etc.", "tags": ["tag1", "tag2"], "target_audience": ["unternehmen"], "verification_method": "code_review|document|tool|hybrid" }}""" def _obligation_section(obligation: ObligationMatch) -> str: """Format the obligation for the prompt.""" parts = ["PFLICHT (was das Gesetz verlangt):"] if obligation.obligation_title: parts.append(f" Titel: {obligation.obligation_title}") if obligation.obligation_text: parts.append(f" Beschreibung: {obligation.obligation_text[:500]}") if obligation.obligation_id: parts.append(f" ID: {obligation.obligation_id}") if obligation.regulation_id: parts.append(f" Rechtsgrundlage: {obligation.regulation_id}") if not obligation.obligation_text and not obligation.obligation_title: parts.append(" (Keine spezifische Pflicht extrahiert)") return "\n".join(parts) def _pattern_section(pattern: ControlPattern) -> str: """Format the pattern for the prompt.""" reqs = "\n ".join(f"- {r}" for r in pattern.requirements_template[:5]) tests = "\n ".join(f"- {t}" for t in pattern.test_procedure_template[:3]) return f"""MUSTER (wie man es typischerweise umsetzt): Pattern: {pattern.name_de} ({pattern.id}) Domain: {pattern.domain} Ziel-Template: {pattern.objective_template} Anforderungs-Template: {reqs} Pruefverfahren-Template: {tests}""" # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _ensure_list(value) -> list: """Ensure a value is a list of strings.""" if isinstance(value, list): return [str(v) for v in value if v] if isinstance(value, str): return [value] return [] def _anchors_from_pattern(pattern: ControlPattern) -> list: """Convert pattern's open_anchor_refs to control anchor format.""" anchors = [] for ref in pattern.open_anchor_refs: anchors.append({ "framework": ref.get("framework", ""), "control_id": ref.get("ref", ""), "title": "", "alignment_score": 0.8, }) return anchors def _validate_control(control: ComposedControl) -> None: """Validate and fix control field values.""" # Severity if control.severity not in VALID_SEVERITIES: control.severity = "medium" # Implementation effort if control.implementation_effort not in VALID_EFFORTS: control.implementation_effort = "m" # Verification method if control.verification_method and control.verification_method not in VALID_VERIFICATION: control.verification_method = None # Risk score if not (0 <= control.risk_score <= 10): control.risk_score = _severity_to_risk(control.severity) # Title length if len(control.title) > 255: control.title = control.title[:252] + "..." # Ensure minimum content if not control.objective: control.objective = control.title if not control.rationale: control.rationale = "Aus regulatorischer Anforderung abgeleitet." if not control.requirements: control.requirements = ["Anforderung gemaess Pflichtbeschreibung umsetzen"] if not control.test_procedure: control.test_procedure = ["Umsetzung der Anforderungen pruefen"] if not control.evidence: control.evidence = ["Dokumentation der Umsetzung"] def _severity_to_risk(severity: str) -> float: """Map severity to a default risk score.""" return { "critical": 9.0, "high": 7.0, "medium": 5.0, "low": 3.0, }.get(severity, 5.0)