Files
Benjamin Admin 825e070ed9
Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Failing after 47s
CI/CD / test-python-backend-compliance (push) Successful in 33s
CI/CD / test-python-document-crawler (push) Successful in 24s
CI/CD / test-python-dsms-gateway (push) Successful in 18s
CI/CD / validate-canonical-controls (push) Successful in 11s
CI/CD / Deploy (push) Has been skipped
feat(multi-layer): complete Multi-Layer Control Architecture (Phases 1-8 + Pass 0)
Implements the full Multi-Layer Control Architecture for migrating ~25,000
Rich Controls into atomic, deduplicated Master Controls with full traceability.

Architecture: Legal Source → Obligation → Control Pattern → Master Control → Customer Instance

New services:
- ObligationExtractor: 3-tier extraction (exact → embedding → LLM)
- PatternMatcher: 2-tier matching (keyword + embedding + domain-bonus)
- ControlComposer: Pattern + Obligation → Master Control
- PipelineAdapter: Pipeline integration + Migration Passes 1-5
- DecompositionPass: Pass 0a/0b — Rich Control → atomic Controls
- CrosswalkRoutes: 15 API endpoints under /v1/canonical/

New DB schema:
- Migration 060: obligation_extractions, control_patterns, crosswalk_matrix
- Migration 061: obligation_candidates, parent_control_uuid tracking

Pattern Library: 50 YAML patterns (30 core + 20 IT-security)
Go SDK: Pattern loader with YAML validation and indexing
Documentation: MkDocs updated with full architecture overview

500 Python tests passing across all components.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-17 09:00:37 +01:00

547 lines
20 KiB
Python

"""Control Composer — Pattern + Obligation → Master Control.
Takes an obligation (from ObligationExtractor) and a matched control pattern
(from PatternMatcher), then uses LLM to compose a structured, actionable
Master Control. Replaces the old Stage 3 (STRUCTURE/REFORM) with a
pattern-guided approach.
Three composition modes based on license rules:
Rule 1: Obligation + Pattern + original text → full control
Rule 2: Obligation + Pattern + original text + citation → control
Rule 3: Obligation + Pattern (NO original text) → reformulated control
Fallback: No pattern match → basic generation (tagged needs_pattern_assignment)
Part of the Multi-Layer Control Architecture (Phase 6 of 8).
"""
import json
import logging
import os
from dataclasses import dataclass, field
from typing import Optional
from compliance.services.obligation_extractor import (
ObligationMatch,
_llm_ollama,
_parse_json,
)
from compliance.services.pattern_matcher import (
ControlPattern,
PatternMatchResult,
)
logger = logging.getLogger(__name__)
OLLAMA_MODEL = os.getenv("CONTROL_GEN_OLLAMA_MODEL", "qwen3.5:35b-a3b")
# Valid values for generated control fields
VALID_SEVERITIES = {"low", "medium", "high", "critical"}
VALID_EFFORTS = {"s", "m", "l", "xl"}
VALID_VERIFICATION = {"code_review", "document", "tool", "hybrid"}
@dataclass
class ComposedControl:
"""A Master Control composed from an obligation + pattern."""
# Core fields (match canonical_controls schema)
control_id: str = ""
title: str = ""
objective: str = ""
rationale: str = ""
scope: dict = field(default_factory=dict)
requirements: list = field(default_factory=list)
test_procedure: list = field(default_factory=list)
evidence: list = field(default_factory=list)
severity: str = "medium"
risk_score: float = 5.0
implementation_effort: str = "m"
open_anchors: list = field(default_factory=list)
release_state: str = "draft"
tags: list = field(default_factory=list)
# 3-Rule License fields
license_rule: Optional[int] = None
source_original_text: Optional[str] = None
source_citation: Optional[dict] = None
customer_visible: bool = True
# Classification
verification_method: Optional[str] = None
category: Optional[str] = None
target_audience: Optional[list] = None
# Pattern + Obligation linkage
pattern_id: Optional[str] = None
obligation_ids: list = field(default_factory=list)
# Metadata
generation_metadata: dict = field(default_factory=dict)
composition_method: str = "pattern_guided" # pattern_guided | fallback
def to_dict(self) -> dict:
"""Serialize for DB storage or API response."""
return {
"control_id": self.control_id,
"title": self.title,
"objective": self.objective,
"rationale": self.rationale,
"scope": self.scope,
"requirements": self.requirements,
"test_procedure": self.test_procedure,
"evidence": self.evidence,
"severity": self.severity,
"risk_score": self.risk_score,
"implementation_effort": self.implementation_effort,
"open_anchors": self.open_anchors,
"release_state": self.release_state,
"tags": self.tags,
"license_rule": self.license_rule,
"source_original_text": self.source_original_text,
"source_citation": self.source_citation,
"customer_visible": self.customer_visible,
"verification_method": self.verification_method,
"category": self.category,
"target_audience": self.target_audience,
"pattern_id": self.pattern_id,
"obligation_ids": self.obligation_ids,
"generation_metadata": self.generation_metadata,
"composition_method": self.composition_method,
}
class ControlComposer:
"""Composes Master Controls from obligations + patterns.
Usage::
composer = ControlComposer()
control = await composer.compose(
obligation=obligation_match,
pattern_result=pattern_match_result,
chunk_text="...",
license_rule=1,
source_citation={...},
)
"""
async def compose(
self,
obligation: ObligationMatch,
pattern_result: PatternMatchResult,
chunk_text: Optional[str] = None,
license_rule: int = 3,
source_citation: Optional[dict] = None,
regulation_code: Optional[str] = None,
) -> ComposedControl:
"""Compose a Master Control from obligation + pattern.
Args:
obligation: The extracted obligation (from ObligationExtractor).
pattern_result: The matched pattern (from PatternMatcher).
chunk_text: Original RAG chunk text (only used for Rules 1-2).
license_rule: 1=free, 2=citation, 3=restricted.
source_citation: Citation metadata for Rule 2.
regulation_code: Source regulation code.
Returns:
ComposedControl ready for storage.
"""
pattern = pattern_result.pattern if pattern_result else None
if pattern:
control = await self._compose_with_pattern(
obligation, pattern, chunk_text, license_rule, source_citation,
)
else:
control = await self._compose_fallback(
obligation, chunk_text, license_rule, source_citation,
)
# Set linkage fields
control.pattern_id = pattern.id if pattern else None
if obligation.obligation_id:
control.obligation_ids = [obligation.obligation_id]
# Set license fields
control.license_rule = license_rule
if license_rule in (1, 2) and chunk_text:
control.source_original_text = chunk_text
if license_rule == 2 and source_citation:
control.source_citation = source_citation
if license_rule == 3:
control.customer_visible = False
control.source_original_text = None
control.source_citation = None
# Build metadata
control.generation_metadata = {
"composition_method": control.composition_method,
"pattern_id": control.pattern_id,
"pattern_confidence": round(pattern_result.confidence, 3) if pattern_result else 0,
"pattern_method": pattern_result.method if pattern_result else "none",
"obligation_id": obligation.obligation_id,
"obligation_method": obligation.method,
"obligation_confidence": round(obligation.confidence, 3),
"license_rule": license_rule,
"regulation_code": regulation_code,
}
# Validate and fix fields
_validate_control(control)
return control
async def compose_batch(
self,
items: list[dict],
) -> list[ComposedControl]:
"""Compose multiple controls.
Args:
items: List of dicts with keys: obligation, pattern_result,
chunk_text, license_rule, source_citation, regulation_code.
Returns:
List of ComposedControl instances.
"""
results = []
for item in items:
control = await self.compose(
obligation=item["obligation"],
pattern_result=item.get("pattern_result", PatternMatchResult()),
chunk_text=item.get("chunk_text"),
license_rule=item.get("license_rule", 3),
source_citation=item.get("source_citation"),
regulation_code=item.get("regulation_code"),
)
results.append(control)
return results
# -----------------------------------------------------------------------
# Pattern-guided composition
# -----------------------------------------------------------------------
async def _compose_with_pattern(
self,
obligation: ObligationMatch,
pattern: ControlPattern,
chunk_text: Optional[str],
license_rule: int,
source_citation: Optional[dict],
) -> ComposedControl:
"""Use LLM to fill the pattern template with obligation-specific details."""
prompt = _build_compose_prompt(obligation, pattern, chunk_text, license_rule)
system_prompt = _compose_system_prompt(license_rule)
llm_result = await _llm_ollama(prompt, system_prompt)
if not llm_result:
return self._compose_from_template(obligation, pattern)
parsed = _parse_json(llm_result)
if not parsed:
return self._compose_from_template(obligation, pattern)
control = ComposedControl(
title=parsed.get("title", pattern.name_de)[:255],
objective=parsed.get("objective", pattern.objective_template),
rationale=parsed.get("rationale", pattern.rationale_template),
requirements=_ensure_list(parsed.get("requirements", pattern.requirements_template)),
test_procedure=_ensure_list(parsed.get("test_procedure", pattern.test_procedure_template)),
evidence=_ensure_list(parsed.get("evidence", pattern.evidence_template)),
severity=parsed.get("severity", pattern.severity_default),
implementation_effort=parsed.get("implementation_effort", pattern.implementation_effort_default),
category=parsed.get("category", pattern.category),
tags=_ensure_list(parsed.get("tags", pattern.tags)),
target_audience=_ensure_list(parsed.get("target_audience", [])),
verification_method=parsed.get("verification_method"),
open_anchors=_anchors_from_pattern(pattern),
composition_method="pattern_guided",
)
return control
def _compose_from_template(
self,
obligation: ObligationMatch,
pattern: ControlPattern,
) -> ComposedControl:
"""Fallback: fill template directly without LLM (when LLM fails)."""
obl_title = obligation.obligation_title or ""
obl_text = obligation.obligation_text or ""
title = f"{pattern.name_de}"
if obl_title:
title = f"{pattern.name_de}{obl_title}"
objective = pattern.objective_template
if obl_text and len(obl_text) > 20:
objective = f"{pattern.objective_template} Bezug: {obl_text[:200]}"
return ComposedControl(
title=title[:255],
objective=objective,
rationale=pattern.rationale_template,
requirements=list(pattern.requirements_template),
test_procedure=list(pattern.test_procedure_template),
evidence=list(pattern.evidence_template),
severity=pattern.severity_default,
implementation_effort=pattern.implementation_effort_default,
category=pattern.category,
tags=list(pattern.tags),
open_anchors=_anchors_from_pattern(pattern),
composition_method="template_only",
)
# -----------------------------------------------------------------------
# Fallback (no pattern)
# -----------------------------------------------------------------------
async def _compose_fallback(
self,
obligation: ObligationMatch,
chunk_text: Optional[str],
license_rule: int,
source_citation: Optional[dict],
) -> ComposedControl:
"""Generate a control without a pattern template (old-style)."""
prompt = _build_fallback_prompt(obligation, chunk_text, license_rule)
system_prompt = _compose_system_prompt(license_rule)
llm_result = await _llm_ollama(prompt, system_prompt)
parsed = _parse_json(llm_result) if llm_result else {}
obl_text = obligation.obligation_text or ""
control = ComposedControl(
title=parsed.get("title", obl_text[:100] if obl_text else "Untitled Control")[:255],
objective=parsed.get("objective", obl_text[:500]),
rationale=parsed.get("rationale", "Aus gesetzlicher Pflicht abgeleitet."),
requirements=_ensure_list(parsed.get("requirements", [])),
test_procedure=_ensure_list(parsed.get("test_procedure", [])),
evidence=_ensure_list(parsed.get("evidence", [])),
severity=parsed.get("severity", "medium"),
implementation_effort=parsed.get("implementation_effort", "m"),
category=parsed.get("category"),
tags=_ensure_list(parsed.get("tags", [])),
target_audience=_ensure_list(parsed.get("target_audience", [])),
verification_method=parsed.get("verification_method"),
composition_method="fallback",
release_state="needs_review",
)
return control
# ---------------------------------------------------------------------------
# Prompt builders
# ---------------------------------------------------------------------------
def _compose_system_prompt(license_rule: int) -> str:
"""Build the system prompt based on license rule."""
if license_rule == 3:
return (
"Du bist ein Security-Compliance-Experte. Deine Aufgabe ist es, "
"eigenstaendige Security Controls zu formulieren. "
"Du formulierst IMMER in eigenen Worten. "
"KOPIERE KEINE Saetze aus dem Quelltext. "
"Verwende eigene Begriffe und Struktur. "
"NENNE NICHT die Quelle. Keine proprietaeren Bezeichner. "
"Antworte NUR mit validem JSON."
)
return (
"Du bist ein Security-Compliance-Experte. "
"Erstelle ein praxisorientiertes, umsetzbares Security Control. "
"Antworte NUR mit validem JSON."
)
def _build_compose_prompt(
obligation: ObligationMatch,
pattern: ControlPattern,
chunk_text: Optional[str],
license_rule: int,
) -> str:
"""Build the LLM prompt for pattern-guided composition."""
obl_section = _obligation_section(obligation)
pattern_section = _pattern_section(pattern)
if license_rule == 3:
context_section = "KONTEXT: Intern analysiert (keine Quellenangabe)."
elif chunk_text:
context_section = f"KONTEXT (Originaltext):\n{chunk_text[:2000]}"
else:
context_section = "KONTEXT: Kein Originaltext verfuegbar."
return f"""Erstelle ein PRAXISORIENTIERTES Security Control.
{obl_section}
{pattern_section}
{context_section}
AUFGABE:
Fuelle das Muster mit pflicht-spezifischen Details.
Das Ergebnis muss UMSETZBAR sein — keine Gesetzesparaphrase.
Formuliere konkret und handlungsorientiert.
Antworte als JSON:
{{
"title": "Kurzer praegnanter Titel (max 100 Zeichen, deutsch)",
"objective": "Was soll erreicht werden? (1-3 Saetze)",
"rationale": "Warum ist das wichtig? (1-2 Saetze)",
"requirements": ["Konkrete Anforderung 1", "Anforderung 2", ...],
"test_procedure": ["Pruefschritt 1", "Pruefschritt 2", ...],
"evidence": ["Nachweis 1", "Nachweis 2", ...],
"severity": "low|medium|high|critical",
"implementation_effort": "s|m|l|xl",
"category": "{pattern.category}",
"tags": ["tag1", "tag2"],
"target_audience": ["unternehmen", "behoerden", "entwickler"],
"verification_method": "code_review|document|tool|hybrid"
}}"""
def _build_fallback_prompt(
obligation: ObligationMatch,
chunk_text: Optional[str],
license_rule: int,
) -> str:
"""Build the LLM prompt for fallback composition (no pattern)."""
obl_section = _obligation_section(obligation)
if license_rule == 3:
context_section = "KONTEXT: Intern analysiert (keine Quellenangabe)."
elif chunk_text:
context_section = f"KONTEXT (Originaltext):\n{chunk_text[:2000]}"
else:
context_section = "KONTEXT: Kein Originaltext verfuegbar."
return f"""Erstelle ein Security Control aus der folgenden Pflicht.
{obl_section}
{context_section}
AUFGABE:
Formuliere ein umsetzbares Security Control.
Keine Gesetzesparaphrase — konkrete Massnahmen beschreiben.
Antworte als JSON:
{{
"title": "Kurzer praegnanter Titel (max 100 Zeichen, deutsch)",
"objective": "Was soll erreicht werden? (1-3 Saetze)",
"rationale": "Warum ist das wichtig? (1-2 Saetze)",
"requirements": ["Konkrete Anforderung 1", "Anforderung 2", ...],
"test_procedure": ["Pruefschritt 1", "Pruefschritt 2", ...],
"evidence": ["Nachweis 1", "Nachweis 2", ...],
"severity": "low|medium|high|critical",
"implementation_effort": "s|m|l|xl",
"category": "one of: authentication, encryption, data_protection, etc.",
"tags": ["tag1", "tag2"],
"target_audience": ["unternehmen"],
"verification_method": "code_review|document|tool|hybrid"
}}"""
def _obligation_section(obligation: ObligationMatch) -> str:
"""Format the obligation for the prompt."""
parts = ["PFLICHT (was das Gesetz verlangt):"]
if obligation.obligation_title:
parts.append(f" Titel: {obligation.obligation_title}")
if obligation.obligation_text:
parts.append(f" Beschreibung: {obligation.obligation_text[:500]}")
if obligation.obligation_id:
parts.append(f" ID: {obligation.obligation_id}")
if obligation.regulation_id:
parts.append(f" Rechtsgrundlage: {obligation.regulation_id}")
if not obligation.obligation_text and not obligation.obligation_title:
parts.append(" (Keine spezifische Pflicht extrahiert)")
return "\n".join(parts)
def _pattern_section(pattern: ControlPattern) -> str:
"""Format the pattern for the prompt."""
reqs = "\n ".join(f"- {r}" for r in pattern.requirements_template[:5])
tests = "\n ".join(f"- {t}" for t in pattern.test_procedure_template[:3])
return f"""MUSTER (wie man es typischerweise umsetzt):
Pattern: {pattern.name_de} ({pattern.id})
Domain: {pattern.domain}
Ziel-Template: {pattern.objective_template}
Anforderungs-Template:
{reqs}
Pruefverfahren-Template:
{tests}"""
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _ensure_list(value) -> list:
"""Ensure a value is a list of strings."""
if isinstance(value, list):
return [str(v) for v in value if v]
if isinstance(value, str):
return [value]
return []
def _anchors_from_pattern(pattern: ControlPattern) -> list:
"""Convert pattern's open_anchor_refs to control anchor format."""
anchors = []
for ref in pattern.open_anchor_refs:
anchors.append({
"framework": ref.get("framework", ""),
"control_id": ref.get("ref", ""),
"title": "",
"alignment_score": 0.8,
})
return anchors
def _validate_control(control: ComposedControl) -> None:
"""Validate and fix control field values."""
# Severity
if control.severity not in VALID_SEVERITIES:
control.severity = "medium"
# Implementation effort
if control.implementation_effort not in VALID_EFFORTS:
control.implementation_effort = "m"
# Verification method
if control.verification_method and control.verification_method not in VALID_VERIFICATION:
control.verification_method = None
# Risk score
if not (0 <= control.risk_score <= 10):
control.risk_score = _severity_to_risk(control.severity)
# Title length
if len(control.title) > 255:
control.title = control.title[:252] + "..."
# Ensure minimum content
if not control.objective:
control.objective = control.title
if not control.rationale:
control.rationale = "Aus regulatorischer Anforderung abgeleitet."
if not control.requirements:
control.requirements = ["Anforderung gemaess Pflichtbeschreibung umsetzen"]
if not control.test_procedure:
control.test_procedure = ["Umsetzung der Anforderungen pruefen"]
if not control.evidence:
control.evidence = ["Dokumentation der Umsetzung"]
def _severity_to_risk(severity: str) -> float:
"""Map severity to a default risk score."""
return {
"critical": 9.0,
"high": 7.0,
"medium": 5.0,
"low": 3.0,
}.get(severity, 5.0)