Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Failing after 47s
CI/CD / test-python-backend-compliance (push) Successful in 33s
CI/CD / test-python-document-crawler (push) Successful in 24s
CI/CD / test-python-dsms-gateway (push) Successful in 18s
CI/CD / validate-canonical-controls (push) Successful in 11s
CI/CD / Deploy (push) Has been skipped
Implements the full Multi-Layer Control Architecture for migrating ~25,000 Rich Controls into atomic, deduplicated Master Controls with full traceability. Architecture: Legal Source → Obligation → Control Pattern → Master Control → Customer Instance New services: - ObligationExtractor: 3-tier extraction (exact → embedding → LLM) - PatternMatcher: 2-tier matching (keyword + embedding + domain-bonus) - ControlComposer: Pattern + Obligation → Master Control - PipelineAdapter: Pipeline integration + Migration Passes 1-5 - DecompositionPass: Pass 0a/0b — Rich Control → atomic Controls - CrosswalkRoutes: 15 API endpoints under /v1/canonical/ New DB schema: - Migration 060: obligation_extractions, control_patterns, crosswalk_matrix - Migration 061: obligation_candidates, parent_control_uuid tracking Pattern Library: 50 YAML patterns (30 core + 20 IT-security) Go SDK: Pattern loader with YAML validation and indexing Documentation: MkDocs updated with full architecture overview 500 Python tests passing across all components. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
547 lines
20 KiB
Python
547 lines
20 KiB
Python
"""Control Composer — Pattern + Obligation → Master Control.
|
|
|
|
Takes an obligation (from ObligationExtractor) and a matched control pattern
|
|
(from PatternMatcher), then uses LLM to compose a structured, actionable
|
|
Master Control. Replaces the old Stage 3 (STRUCTURE/REFORM) with a
|
|
pattern-guided approach.
|
|
|
|
Three composition modes based on license rules:
|
|
Rule 1: Obligation + Pattern + original text → full control
|
|
Rule 2: Obligation + Pattern + original text + citation → control
|
|
Rule 3: Obligation + Pattern (NO original text) → reformulated control
|
|
|
|
Fallback: No pattern match → basic generation (tagged needs_pattern_assignment)
|
|
|
|
Part of the Multi-Layer Control Architecture (Phase 6 of 8).
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional
|
|
|
|
from compliance.services.obligation_extractor import (
|
|
ObligationMatch,
|
|
_llm_ollama,
|
|
_parse_json,
|
|
)
|
|
from compliance.services.pattern_matcher import (
|
|
ControlPattern,
|
|
PatternMatchResult,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
OLLAMA_MODEL = os.getenv("CONTROL_GEN_OLLAMA_MODEL", "qwen3.5:35b-a3b")
|
|
|
|
# Valid values for generated control fields
|
|
VALID_SEVERITIES = {"low", "medium", "high", "critical"}
|
|
VALID_EFFORTS = {"s", "m", "l", "xl"}
|
|
VALID_VERIFICATION = {"code_review", "document", "tool", "hybrid"}
|
|
|
|
|
|
@dataclass
|
|
class ComposedControl:
|
|
"""A Master Control composed from an obligation + pattern."""
|
|
|
|
# Core fields (match canonical_controls schema)
|
|
control_id: str = ""
|
|
title: str = ""
|
|
objective: str = ""
|
|
rationale: str = ""
|
|
scope: dict = field(default_factory=dict)
|
|
requirements: list = field(default_factory=list)
|
|
test_procedure: list = field(default_factory=list)
|
|
evidence: list = field(default_factory=list)
|
|
severity: str = "medium"
|
|
risk_score: float = 5.0
|
|
implementation_effort: str = "m"
|
|
open_anchors: list = field(default_factory=list)
|
|
release_state: str = "draft"
|
|
tags: list = field(default_factory=list)
|
|
# 3-Rule License fields
|
|
license_rule: Optional[int] = None
|
|
source_original_text: Optional[str] = None
|
|
source_citation: Optional[dict] = None
|
|
customer_visible: bool = True
|
|
# Classification
|
|
verification_method: Optional[str] = None
|
|
category: Optional[str] = None
|
|
target_audience: Optional[list] = None
|
|
# Pattern + Obligation linkage
|
|
pattern_id: Optional[str] = None
|
|
obligation_ids: list = field(default_factory=list)
|
|
# Metadata
|
|
generation_metadata: dict = field(default_factory=dict)
|
|
composition_method: str = "pattern_guided" # pattern_guided | fallback
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Serialize for DB storage or API response."""
|
|
return {
|
|
"control_id": self.control_id,
|
|
"title": self.title,
|
|
"objective": self.objective,
|
|
"rationale": self.rationale,
|
|
"scope": self.scope,
|
|
"requirements": self.requirements,
|
|
"test_procedure": self.test_procedure,
|
|
"evidence": self.evidence,
|
|
"severity": self.severity,
|
|
"risk_score": self.risk_score,
|
|
"implementation_effort": self.implementation_effort,
|
|
"open_anchors": self.open_anchors,
|
|
"release_state": self.release_state,
|
|
"tags": self.tags,
|
|
"license_rule": self.license_rule,
|
|
"source_original_text": self.source_original_text,
|
|
"source_citation": self.source_citation,
|
|
"customer_visible": self.customer_visible,
|
|
"verification_method": self.verification_method,
|
|
"category": self.category,
|
|
"target_audience": self.target_audience,
|
|
"pattern_id": self.pattern_id,
|
|
"obligation_ids": self.obligation_ids,
|
|
"generation_metadata": self.generation_metadata,
|
|
"composition_method": self.composition_method,
|
|
}
|
|
|
|
|
|
class ControlComposer:
|
|
"""Composes Master Controls from obligations + patterns.
|
|
|
|
Usage::
|
|
|
|
composer = ControlComposer()
|
|
|
|
control = await composer.compose(
|
|
obligation=obligation_match,
|
|
pattern_result=pattern_match_result,
|
|
chunk_text="...",
|
|
license_rule=1,
|
|
source_citation={...},
|
|
)
|
|
"""
|
|
|
|
async def compose(
|
|
self,
|
|
obligation: ObligationMatch,
|
|
pattern_result: PatternMatchResult,
|
|
chunk_text: Optional[str] = None,
|
|
license_rule: int = 3,
|
|
source_citation: Optional[dict] = None,
|
|
regulation_code: Optional[str] = None,
|
|
) -> ComposedControl:
|
|
"""Compose a Master Control from obligation + pattern.
|
|
|
|
Args:
|
|
obligation: The extracted obligation (from ObligationExtractor).
|
|
pattern_result: The matched pattern (from PatternMatcher).
|
|
chunk_text: Original RAG chunk text (only used for Rules 1-2).
|
|
license_rule: 1=free, 2=citation, 3=restricted.
|
|
source_citation: Citation metadata for Rule 2.
|
|
regulation_code: Source regulation code.
|
|
|
|
Returns:
|
|
ComposedControl ready for storage.
|
|
"""
|
|
pattern = pattern_result.pattern if pattern_result else None
|
|
|
|
if pattern:
|
|
control = await self._compose_with_pattern(
|
|
obligation, pattern, chunk_text, license_rule, source_citation,
|
|
)
|
|
else:
|
|
control = await self._compose_fallback(
|
|
obligation, chunk_text, license_rule, source_citation,
|
|
)
|
|
|
|
# Set linkage fields
|
|
control.pattern_id = pattern.id if pattern else None
|
|
if obligation.obligation_id:
|
|
control.obligation_ids = [obligation.obligation_id]
|
|
|
|
# Set license fields
|
|
control.license_rule = license_rule
|
|
if license_rule in (1, 2) and chunk_text:
|
|
control.source_original_text = chunk_text
|
|
if license_rule == 2 and source_citation:
|
|
control.source_citation = source_citation
|
|
if license_rule == 3:
|
|
control.customer_visible = False
|
|
control.source_original_text = None
|
|
control.source_citation = None
|
|
|
|
# Build metadata
|
|
control.generation_metadata = {
|
|
"composition_method": control.composition_method,
|
|
"pattern_id": control.pattern_id,
|
|
"pattern_confidence": round(pattern_result.confidence, 3) if pattern_result else 0,
|
|
"pattern_method": pattern_result.method if pattern_result else "none",
|
|
"obligation_id": obligation.obligation_id,
|
|
"obligation_method": obligation.method,
|
|
"obligation_confidence": round(obligation.confidence, 3),
|
|
"license_rule": license_rule,
|
|
"regulation_code": regulation_code,
|
|
}
|
|
|
|
# Validate and fix fields
|
|
_validate_control(control)
|
|
|
|
return control
|
|
|
|
async def compose_batch(
|
|
self,
|
|
items: list[dict],
|
|
) -> list[ComposedControl]:
|
|
"""Compose multiple controls.
|
|
|
|
Args:
|
|
items: List of dicts with keys: obligation, pattern_result,
|
|
chunk_text, license_rule, source_citation, regulation_code.
|
|
|
|
Returns:
|
|
List of ComposedControl instances.
|
|
"""
|
|
results = []
|
|
for item in items:
|
|
control = await self.compose(
|
|
obligation=item["obligation"],
|
|
pattern_result=item.get("pattern_result", PatternMatchResult()),
|
|
chunk_text=item.get("chunk_text"),
|
|
license_rule=item.get("license_rule", 3),
|
|
source_citation=item.get("source_citation"),
|
|
regulation_code=item.get("regulation_code"),
|
|
)
|
|
results.append(control)
|
|
return results
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Pattern-guided composition
|
|
# -----------------------------------------------------------------------
|
|
|
|
async def _compose_with_pattern(
|
|
self,
|
|
obligation: ObligationMatch,
|
|
pattern: ControlPattern,
|
|
chunk_text: Optional[str],
|
|
license_rule: int,
|
|
source_citation: Optional[dict],
|
|
) -> ComposedControl:
|
|
"""Use LLM to fill the pattern template with obligation-specific details."""
|
|
prompt = _build_compose_prompt(obligation, pattern, chunk_text, license_rule)
|
|
system_prompt = _compose_system_prompt(license_rule)
|
|
|
|
llm_result = await _llm_ollama(prompt, system_prompt)
|
|
if not llm_result:
|
|
return self._compose_from_template(obligation, pattern)
|
|
|
|
parsed = _parse_json(llm_result)
|
|
if not parsed:
|
|
return self._compose_from_template(obligation, pattern)
|
|
|
|
control = ComposedControl(
|
|
title=parsed.get("title", pattern.name_de)[:255],
|
|
objective=parsed.get("objective", pattern.objective_template),
|
|
rationale=parsed.get("rationale", pattern.rationale_template),
|
|
requirements=_ensure_list(parsed.get("requirements", pattern.requirements_template)),
|
|
test_procedure=_ensure_list(parsed.get("test_procedure", pattern.test_procedure_template)),
|
|
evidence=_ensure_list(parsed.get("evidence", pattern.evidence_template)),
|
|
severity=parsed.get("severity", pattern.severity_default),
|
|
implementation_effort=parsed.get("implementation_effort", pattern.implementation_effort_default),
|
|
category=parsed.get("category", pattern.category),
|
|
tags=_ensure_list(parsed.get("tags", pattern.tags)),
|
|
target_audience=_ensure_list(parsed.get("target_audience", [])),
|
|
verification_method=parsed.get("verification_method"),
|
|
open_anchors=_anchors_from_pattern(pattern),
|
|
composition_method="pattern_guided",
|
|
)
|
|
|
|
return control
|
|
|
|
def _compose_from_template(
|
|
self,
|
|
obligation: ObligationMatch,
|
|
pattern: ControlPattern,
|
|
) -> ComposedControl:
|
|
"""Fallback: fill template directly without LLM (when LLM fails)."""
|
|
obl_title = obligation.obligation_title or ""
|
|
obl_text = obligation.obligation_text or ""
|
|
|
|
title = f"{pattern.name_de}"
|
|
if obl_title:
|
|
title = f"{pattern.name_de} — {obl_title}"
|
|
|
|
objective = pattern.objective_template
|
|
if obl_text and len(obl_text) > 20:
|
|
objective = f"{pattern.objective_template} Bezug: {obl_text[:200]}"
|
|
|
|
return ComposedControl(
|
|
title=title[:255],
|
|
objective=objective,
|
|
rationale=pattern.rationale_template,
|
|
requirements=list(pattern.requirements_template),
|
|
test_procedure=list(pattern.test_procedure_template),
|
|
evidence=list(pattern.evidence_template),
|
|
severity=pattern.severity_default,
|
|
implementation_effort=pattern.implementation_effort_default,
|
|
category=pattern.category,
|
|
tags=list(pattern.tags),
|
|
open_anchors=_anchors_from_pattern(pattern),
|
|
composition_method="template_only",
|
|
)
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Fallback (no pattern)
|
|
# -----------------------------------------------------------------------
|
|
|
|
async def _compose_fallback(
|
|
self,
|
|
obligation: ObligationMatch,
|
|
chunk_text: Optional[str],
|
|
license_rule: int,
|
|
source_citation: Optional[dict],
|
|
) -> ComposedControl:
|
|
"""Generate a control without a pattern template (old-style)."""
|
|
prompt = _build_fallback_prompt(obligation, chunk_text, license_rule)
|
|
system_prompt = _compose_system_prompt(license_rule)
|
|
|
|
llm_result = await _llm_ollama(prompt, system_prompt)
|
|
parsed = _parse_json(llm_result) if llm_result else {}
|
|
|
|
obl_text = obligation.obligation_text or ""
|
|
|
|
control = ComposedControl(
|
|
title=parsed.get("title", obl_text[:100] if obl_text else "Untitled Control")[:255],
|
|
objective=parsed.get("objective", obl_text[:500]),
|
|
rationale=parsed.get("rationale", "Aus gesetzlicher Pflicht abgeleitet."),
|
|
requirements=_ensure_list(parsed.get("requirements", [])),
|
|
test_procedure=_ensure_list(parsed.get("test_procedure", [])),
|
|
evidence=_ensure_list(parsed.get("evidence", [])),
|
|
severity=parsed.get("severity", "medium"),
|
|
implementation_effort=parsed.get("implementation_effort", "m"),
|
|
category=parsed.get("category"),
|
|
tags=_ensure_list(parsed.get("tags", [])),
|
|
target_audience=_ensure_list(parsed.get("target_audience", [])),
|
|
verification_method=parsed.get("verification_method"),
|
|
composition_method="fallback",
|
|
release_state="needs_review",
|
|
)
|
|
|
|
return control
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Prompt builders
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _compose_system_prompt(license_rule: int) -> str:
|
|
"""Build the system prompt based on license rule."""
|
|
if license_rule == 3:
|
|
return (
|
|
"Du bist ein Security-Compliance-Experte. Deine Aufgabe ist es, "
|
|
"eigenstaendige Security Controls zu formulieren. "
|
|
"Du formulierst IMMER in eigenen Worten. "
|
|
"KOPIERE KEINE Saetze aus dem Quelltext. "
|
|
"Verwende eigene Begriffe und Struktur. "
|
|
"NENNE NICHT die Quelle. Keine proprietaeren Bezeichner. "
|
|
"Antworte NUR mit validem JSON."
|
|
)
|
|
return (
|
|
"Du bist ein Security-Compliance-Experte. "
|
|
"Erstelle ein praxisorientiertes, umsetzbares Security Control. "
|
|
"Antworte NUR mit validem JSON."
|
|
)
|
|
|
|
|
|
def _build_compose_prompt(
|
|
obligation: ObligationMatch,
|
|
pattern: ControlPattern,
|
|
chunk_text: Optional[str],
|
|
license_rule: int,
|
|
) -> str:
|
|
"""Build the LLM prompt for pattern-guided composition."""
|
|
obl_section = _obligation_section(obligation)
|
|
pattern_section = _pattern_section(pattern)
|
|
|
|
if license_rule == 3:
|
|
context_section = "KONTEXT: Intern analysiert (keine Quellenangabe)."
|
|
elif chunk_text:
|
|
context_section = f"KONTEXT (Originaltext):\n{chunk_text[:2000]}"
|
|
else:
|
|
context_section = "KONTEXT: Kein Originaltext verfuegbar."
|
|
|
|
return f"""Erstelle ein PRAXISORIENTIERTES Security Control.
|
|
|
|
{obl_section}
|
|
|
|
{pattern_section}
|
|
|
|
{context_section}
|
|
|
|
AUFGABE:
|
|
Fuelle das Muster mit pflicht-spezifischen Details.
|
|
Das Ergebnis muss UMSETZBAR sein — keine Gesetzesparaphrase.
|
|
Formuliere konkret und handlungsorientiert.
|
|
|
|
Antworte als JSON:
|
|
{{
|
|
"title": "Kurzer praegnanter Titel (max 100 Zeichen, deutsch)",
|
|
"objective": "Was soll erreicht werden? (1-3 Saetze)",
|
|
"rationale": "Warum ist das wichtig? (1-2 Saetze)",
|
|
"requirements": ["Konkrete Anforderung 1", "Anforderung 2", ...],
|
|
"test_procedure": ["Pruefschritt 1", "Pruefschritt 2", ...],
|
|
"evidence": ["Nachweis 1", "Nachweis 2", ...],
|
|
"severity": "low|medium|high|critical",
|
|
"implementation_effort": "s|m|l|xl",
|
|
"category": "{pattern.category}",
|
|
"tags": ["tag1", "tag2"],
|
|
"target_audience": ["unternehmen", "behoerden", "entwickler"],
|
|
"verification_method": "code_review|document|tool|hybrid"
|
|
}}"""
|
|
|
|
|
|
def _build_fallback_prompt(
|
|
obligation: ObligationMatch,
|
|
chunk_text: Optional[str],
|
|
license_rule: int,
|
|
) -> str:
|
|
"""Build the LLM prompt for fallback composition (no pattern)."""
|
|
obl_section = _obligation_section(obligation)
|
|
|
|
if license_rule == 3:
|
|
context_section = "KONTEXT: Intern analysiert (keine Quellenangabe)."
|
|
elif chunk_text:
|
|
context_section = f"KONTEXT (Originaltext):\n{chunk_text[:2000]}"
|
|
else:
|
|
context_section = "KONTEXT: Kein Originaltext verfuegbar."
|
|
|
|
return f"""Erstelle ein Security Control aus der folgenden Pflicht.
|
|
|
|
{obl_section}
|
|
|
|
{context_section}
|
|
|
|
AUFGABE:
|
|
Formuliere ein umsetzbares Security Control.
|
|
Keine Gesetzesparaphrase — konkrete Massnahmen beschreiben.
|
|
|
|
Antworte als JSON:
|
|
{{
|
|
"title": "Kurzer praegnanter Titel (max 100 Zeichen, deutsch)",
|
|
"objective": "Was soll erreicht werden? (1-3 Saetze)",
|
|
"rationale": "Warum ist das wichtig? (1-2 Saetze)",
|
|
"requirements": ["Konkrete Anforderung 1", "Anforderung 2", ...],
|
|
"test_procedure": ["Pruefschritt 1", "Pruefschritt 2", ...],
|
|
"evidence": ["Nachweis 1", "Nachweis 2", ...],
|
|
"severity": "low|medium|high|critical",
|
|
"implementation_effort": "s|m|l|xl",
|
|
"category": "one of: authentication, encryption, data_protection, etc.",
|
|
"tags": ["tag1", "tag2"],
|
|
"target_audience": ["unternehmen"],
|
|
"verification_method": "code_review|document|tool|hybrid"
|
|
}}"""
|
|
|
|
|
|
def _obligation_section(obligation: ObligationMatch) -> str:
|
|
"""Format the obligation for the prompt."""
|
|
parts = ["PFLICHT (was das Gesetz verlangt):"]
|
|
if obligation.obligation_title:
|
|
parts.append(f" Titel: {obligation.obligation_title}")
|
|
if obligation.obligation_text:
|
|
parts.append(f" Beschreibung: {obligation.obligation_text[:500]}")
|
|
if obligation.obligation_id:
|
|
parts.append(f" ID: {obligation.obligation_id}")
|
|
if obligation.regulation_id:
|
|
parts.append(f" Rechtsgrundlage: {obligation.regulation_id}")
|
|
if not obligation.obligation_text and not obligation.obligation_title:
|
|
parts.append(" (Keine spezifische Pflicht extrahiert)")
|
|
return "\n".join(parts)
|
|
|
|
|
|
def _pattern_section(pattern: ControlPattern) -> str:
|
|
"""Format the pattern for the prompt."""
|
|
reqs = "\n ".join(f"- {r}" for r in pattern.requirements_template[:5])
|
|
tests = "\n ".join(f"- {t}" for t in pattern.test_procedure_template[:3])
|
|
return f"""MUSTER (wie man es typischerweise umsetzt):
|
|
Pattern: {pattern.name_de} ({pattern.id})
|
|
Domain: {pattern.domain}
|
|
Ziel-Template: {pattern.objective_template}
|
|
Anforderungs-Template:
|
|
{reqs}
|
|
Pruefverfahren-Template:
|
|
{tests}"""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _ensure_list(value) -> list:
|
|
"""Ensure a value is a list of strings."""
|
|
if isinstance(value, list):
|
|
return [str(v) for v in value if v]
|
|
if isinstance(value, str):
|
|
return [value]
|
|
return []
|
|
|
|
|
|
def _anchors_from_pattern(pattern: ControlPattern) -> list:
|
|
"""Convert pattern's open_anchor_refs to control anchor format."""
|
|
anchors = []
|
|
for ref in pattern.open_anchor_refs:
|
|
anchors.append({
|
|
"framework": ref.get("framework", ""),
|
|
"control_id": ref.get("ref", ""),
|
|
"title": "",
|
|
"alignment_score": 0.8,
|
|
})
|
|
return anchors
|
|
|
|
|
|
def _validate_control(control: ComposedControl) -> None:
|
|
"""Validate and fix control field values."""
|
|
# Severity
|
|
if control.severity not in VALID_SEVERITIES:
|
|
control.severity = "medium"
|
|
|
|
# Implementation effort
|
|
if control.implementation_effort not in VALID_EFFORTS:
|
|
control.implementation_effort = "m"
|
|
|
|
# Verification method
|
|
if control.verification_method and control.verification_method not in VALID_VERIFICATION:
|
|
control.verification_method = None
|
|
|
|
# Risk score
|
|
if not (0 <= control.risk_score <= 10):
|
|
control.risk_score = _severity_to_risk(control.severity)
|
|
|
|
# Title length
|
|
if len(control.title) > 255:
|
|
control.title = control.title[:252] + "..."
|
|
|
|
# Ensure minimum content
|
|
if not control.objective:
|
|
control.objective = control.title
|
|
if not control.rationale:
|
|
control.rationale = "Aus regulatorischer Anforderung abgeleitet."
|
|
if not control.requirements:
|
|
control.requirements = ["Anforderung gemaess Pflichtbeschreibung umsetzen"]
|
|
if not control.test_procedure:
|
|
control.test_procedure = ["Umsetzung der Anforderungen pruefen"]
|
|
if not control.evidence:
|
|
control.evidence = ["Dokumentation der Umsetzung"]
|
|
|
|
|
|
def _severity_to_risk(severity: str) -> float:
|
|
"""Map severity to a default risk score."""
|
|
return {
|
|
"critical": 9.0,
|
|
"high": 7.0,
|
|
"medium": 5.0,
|
|
"low": 3.0,
|
|
}.get(severity, 5.0)
|