Adds a routing layer between Pass 0a and Pass 0b that classifies obligations into atomic/compound/framework_container. Framework-container obligations (e.g. "CCM-Praktiken fuer AIS") are decomposed into concrete sub-obligations via an internal framework registry before Pass 0b composition. - New: framework_decomposition.py with routing, matching, decomposition - New: Framework registry (NIST SP 800-53, OWASP ASVS, CSA CCM) as JSON - New: Composite detection flags on atomic controls (is_composite, atomicity) - New: gen_meta fields: framework_ref, framework_domain, decomposition_source - Integration: _route_and_compose() in run_pass0b() deterministic path - 248 tests (198 decomposition + 50 framework), all passing Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
715 lines
24 KiB
Python
715 lines
24 KiB
Python
"""Framework Decomposition Engine — decomposes framework-container obligations.
|
|
|
|
Sits between Pass 0a (obligation extraction) and Pass 0b (atomic control
|
|
composition). Detects obligations that reference a framework domain (e.g.
|
|
"CCM-Praktiken fuer AIS") and decomposes them into concrete sub-obligations
|
|
using an internal framework registry.
|
|
|
|
Three routing types:
|
|
atomic → pass through to Pass 0b unchanged
|
|
compound → split compound verbs, then Pass 0b
|
|
framework_container → decompose via registry, then Pass 0b
|
|
|
|
The registry is a set of JSON files under compliance/data/frameworks/.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import uuid
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Registry loading
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_REGISTRY_DIR = Path(__file__).resolve().parent.parent / "data" / "frameworks"
|
|
_REGISTRY: dict[str, dict] = {} # framework_id → framework dict
|
|
|
|
|
|
def _load_registry() -> dict[str, dict]:
|
|
"""Load all framework JSON files from the registry directory."""
|
|
registry: dict[str, dict] = {}
|
|
if not _REGISTRY_DIR.is_dir():
|
|
logger.warning("Framework registry dir not found: %s", _REGISTRY_DIR)
|
|
return registry
|
|
|
|
for fpath in sorted(_REGISTRY_DIR.glob("*.json")):
|
|
try:
|
|
with open(fpath, encoding="utf-8") as f:
|
|
fw = json.load(f)
|
|
fw_id = fw.get("framework_id", fpath.stem)
|
|
registry[fw_id] = fw
|
|
logger.info(
|
|
"Loaded framework: %s (%d domains)",
|
|
fw_id,
|
|
len(fw.get("domains", [])),
|
|
)
|
|
except Exception:
|
|
logger.exception("Failed to load framework file: %s", fpath)
|
|
return registry
|
|
|
|
|
|
def get_registry() -> dict[str, dict]:
|
|
"""Return the global framework registry (lazy-loaded)."""
|
|
global _REGISTRY
|
|
if not _REGISTRY:
|
|
_REGISTRY = _load_registry()
|
|
return _REGISTRY
|
|
|
|
|
|
def reload_registry() -> dict[str, dict]:
|
|
"""Force-reload the framework registry from disk."""
|
|
global _REGISTRY
|
|
_REGISTRY = _load_registry()
|
|
return _REGISTRY
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Framework alias index (built from registry)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _build_alias_index(registry: dict[str, dict]) -> dict[str, str]:
|
|
"""Build a lowercase alias → framework_id lookup."""
|
|
idx: dict[str, str] = {}
|
|
for fw_id, fw in registry.items():
|
|
# Framework-level aliases
|
|
idx[fw_id.lower()] = fw_id
|
|
name = fw.get("display_name", "")
|
|
if name:
|
|
idx[name.lower()] = fw_id
|
|
# Common short forms
|
|
for part in fw_id.lower().replace("_", " ").split():
|
|
if len(part) >= 3:
|
|
idx[part] = fw_id
|
|
return idx
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Routing — classify obligation type
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Extended patterns for framework detection (beyond the simple _COMPOSITE_RE
|
|
# in decomposition_pass.py — here we also capture the framework name)
|
|
_FRAMEWORK_PATTERN = re.compile(
|
|
r"(?:praktiken|kontrollen|ma(?:ss|ß)nahmen|anforderungen|vorgaben|controls|practices|measures|requirements)"
|
|
r"\s+(?:f(?:ue|ü)r|aus|gem(?:ae|ä)(?:ss|ß)|nach|from|of|for|per)\s+"
|
|
r"(.+?)(?:\s+(?:m(?:ue|ü)ssen|sollen|sind|werden|implementieren|umsetzen|einf(?:ue|ü)hren)|\.|,|$)",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Direct framework name references
|
|
_DIRECT_FRAMEWORK_RE = re.compile(
|
|
r"\b(?:CSA\s*CCM|NIST\s*(?:SP\s*)?800-53|OWASP\s*(?:ASVS|SAMM|Top\s*10)"
|
|
r"|CIS\s*Controls|BSI\s*(?:IT-)?Grundschutz|ENISA|ISO\s*2700[12]"
|
|
r"|COBIT|SOX|PCI\s*DSS|HITRUST|SOC\s*2|KRITIS)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Compound verb patterns (multiple main verbs)
|
|
_COMPOUND_VERB_RE = re.compile(
|
|
r"\b(?:und|sowie|als\s+auch|or|and)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# No-split phrases that look compound but aren't
|
|
_NO_SPLIT_PHRASES = [
|
|
"pflegen und aufrechterhalten",
|
|
"dokumentieren und pflegen",
|
|
"definieren und dokumentieren",
|
|
"erstellen und freigeben",
|
|
"pruefen und genehmigen",
|
|
"identifizieren und bewerten",
|
|
"erkennen und melden",
|
|
"define and maintain",
|
|
"create and maintain",
|
|
"establish and maintain",
|
|
"monitor and review",
|
|
"detect and respond",
|
|
]
|
|
|
|
|
|
@dataclass
|
|
class RoutingResult:
|
|
"""Result of obligation routing classification."""
|
|
routing_type: str # atomic | compound | framework_container | unknown_review
|
|
framework_ref: Optional[str] = None
|
|
framework_domain: Optional[str] = None
|
|
domain_title: Optional[str] = None
|
|
confidence: float = 0.0
|
|
reason: str = ""
|
|
|
|
|
|
def classify_routing(
|
|
obligation_text: str,
|
|
action_raw: str,
|
|
object_raw: str,
|
|
condition_raw: Optional[str] = None,
|
|
) -> RoutingResult:
|
|
"""Classify an obligation into atomic / compound / framework_container."""
|
|
combined = f"{obligation_text} {object_raw}".lower()
|
|
|
|
# --- Step 1: Framework container detection ---
|
|
fw_result = _detect_framework(obligation_text, object_raw)
|
|
if fw_result.routing_type == "framework_container":
|
|
return fw_result
|
|
|
|
# --- Step 2: Compound verb detection ---
|
|
if _is_compound_obligation(action_raw, obligation_text):
|
|
return RoutingResult(
|
|
routing_type="compound",
|
|
confidence=0.7,
|
|
reason="multiple_main_verbs",
|
|
)
|
|
|
|
# --- Step 3: Default = atomic ---
|
|
return RoutingResult(
|
|
routing_type="atomic",
|
|
confidence=0.9,
|
|
reason="single_action_single_object",
|
|
)
|
|
|
|
|
|
def _detect_framework(
|
|
obligation_text: str, object_raw: str,
|
|
) -> RoutingResult:
|
|
"""Detect if obligation references a framework domain."""
|
|
combined = f"{obligation_text} {object_raw}"
|
|
registry = get_registry()
|
|
alias_idx = _build_alias_index(registry)
|
|
|
|
# Strategy 1: direct framework name match
|
|
m = _DIRECT_FRAMEWORK_RE.search(combined)
|
|
if m:
|
|
fw_name = m.group(0).strip()
|
|
fw_id = _resolve_framework_id(fw_name, alias_idx, registry)
|
|
if fw_id:
|
|
domain_id, domain_title = _match_domain(
|
|
combined, registry[fw_id],
|
|
)
|
|
return RoutingResult(
|
|
routing_type="framework_container",
|
|
framework_ref=fw_id,
|
|
framework_domain=domain_id,
|
|
domain_title=domain_title,
|
|
confidence=0.95 if domain_id else 0.75,
|
|
reason=f"direct_framework_match:{fw_name}",
|
|
)
|
|
else:
|
|
# Framework name recognized but not in registry
|
|
return RoutingResult(
|
|
routing_type="framework_container",
|
|
framework_ref=None,
|
|
framework_domain=None,
|
|
confidence=0.6,
|
|
reason=f"direct_framework_match_no_registry:{fw_name}",
|
|
)
|
|
|
|
# Strategy 2: pattern match ("Praktiken fuer X")
|
|
m2 = _FRAMEWORK_PATTERN.search(combined)
|
|
if m2:
|
|
ref_text = m2.group(1).strip()
|
|
fw_id, domain_id, domain_title = _resolve_from_ref_text(
|
|
ref_text, registry, alias_idx,
|
|
)
|
|
if fw_id:
|
|
return RoutingResult(
|
|
routing_type="framework_container",
|
|
framework_ref=fw_id,
|
|
framework_domain=domain_id,
|
|
domain_title=domain_title,
|
|
confidence=0.85 if domain_id else 0.65,
|
|
reason=f"pattern_match:{ref_text}",
|
|
)
|
|
|
|
# Strategy 3: keyword-heavy object
|
|
if _has_framework_keywords(object_raw):
|
|
return RoutingResult(
|
|
routing_type="framework_container",
|
|
framework_ref=None,
|
|
framework_domain=None,
|
|
confidence=0.5,
|
|
reason="framework_keywords_in_object",
|
|
)
|
|
|
|
return RoutingResult(routing_type="atomic", confidence=0.0)
|
|
|
|
|
|
def _resolve_framework_id(
|
|
name: str,
|
|
alias_idx: dict[str, str],
|
|
registry: dict[str, dict],
|
|
) -> Optional[str]:
|
|
"""Resolve a framework name to its registry ID."""
|
|
normalized = re.sub(r"\s+", " ", name.strip().lower())
|
|
# Direct alias match
|
|
if normalized in alias_idx:
|
|
return alias_idx[normalized]
|
|
# Try compact form (strip spaces, hyphens, underscores)
|
|
compact = re.sub(r"[\s_\-]+", "", normalized)
|
|
for alias, fw_id in alias_idx.items():
|
|
if re.sub(r"[\s_\-]+", "", alias) == compact:
|
|
return fw_id
|
|
# Substring match in display names
|
|
for fw_id, fw in registry.items():
|
|
display = fw.get("display_name", "").lower()
|
|
if normalized in display or display in normalized:
|
|
return fw_id
|
|
# Partial match: check if normalized contains any alias (for multi-word refs)
|
|
for alias, fw_id in alias_idx.items():
|
|
if len(alias) >= 4 and alias in normalized:
|
|
return fw_id
|
|
return None
|
|
|
|
|
|
def _match_domain(
|
|
text: str, framework: dict,
|
|
) -> tuple[Optional[str], Optional[str]]:
|
|
"""Match a domain within a framework from text references."""
|
|
text_lower = text.lower()
|
|
best_id: Optional[str] = None
|
|
best_title: Optional[str] = None
|
|
best_score = 0
|
|
|
|
for domain in framework.get("domains", []):
|
|
score = 0
|
|
domain_id = domain["domain_id"]
|
|
title = domain.get("title", "")
|
|
|
|
# Exact domain ID match (e.g. "AIS")
|
|
if re.search(rf"\b{re.escape(domain_id)}\b", text, re.IGNORECASE):
|
|
score += 10
|
|
|
|
# Full title match
|
|
if title.lower() in text_lower:
|
|
score += 8
|
|
|
|
# Alias match
|
|
for alias in domain.get("aliases", []):
|
|
if alias.lower() in text_lower:
|
|
score += 6
|
|
break
|
|
|
|
# Keyword overlap
|
|
kw_hits = sum(
|
|
1 for kw in domain.get("keywords", [])
|
|
if kw.lower() in text_lower
|
|
)
|
|
score += kw_hits
|
|
|
|
if score > best_score:
|
|
best_score = score
|
|
best_id = domain_id
|
|
best_title = title
|
|
|
|
if best_score >= 3:
|
|
return best_id, best_title
|
|
return None, None
|
|
|
|
|
|
def _resolve_from_ref_text(
|
|
ref_text: str,
|
|
registry: dict[str, dict],
|
|
alias_idx: dict[str, str],
|
|
) -> tuple[Optional[str], Optional[str], Optional[str]]:
|
|
"""Resolve framework + domain from a reference text like 'AIS' or 'Application Security'."""
|
|
ref_lower = ref_text.lower()
|
|
|
|
for fw_id, fw in registry.items():
|
|
for domain in fw.get("domains", []):
|
|
# Check domain ID
|
|
if domain["domain_id"].lower() in ref_lower:
|
|
return fw_id, domain["domain_id"], domain.get("title")
|
|
# Check title
|
|
if domain.get("title", "").lower() in ref_lower:
|
|
return fw_id, domain["domain_id"], domain.get("title")
|
|
# Check aliases
|
|
for alias in domain.get("aliases", []):
|
|
if alias.lower() in ref_lower or ref_lower in alias.lower():
|
|
return fw_id, domain["domain_id"], domain.get("title")
|
|
|
|
return None, None, None
|
|
|
|
|
|
_FRAMEWORK_KW_SET = {
|
|
"praktiken", "kontrollen", "massnahmen", "maßnahmen",
|
|
"anforderungen", "vorgaben", "framework", "standard",
|
|
"baseline", "katalog", "domain", "family", "category",
|
|
"practices", "controls", "measures", "requirements",
|
|
}
|
|
|
|
|
|
def _has_framework_keywords(text: str) -> bool:
|
|
"""Check if text contains framework-indicator keywords."""
|
|
words = set(re.findall(r"[a-zäöüß]+", text.lower()))
|
|
return len(words & _FRAMEWORK_KW_SET) >= 2
|
|
|
|
|
|
def _is_compound_obligation(action_raw: str, obligation_text: str) -> bool:
|
|
"""Detect if the obligation has multiple competing main verbs."""
|
|
if not action_raw:
|
|
return False
|
|
|
|
action_lower = action_raw.lower().strip()
|
|
|
|
# Check no-split phrases first
|
|
for phrase in _NO_SPLIT_PHRASES:
|
|
if phrase in action_lower:
|
|
return False
|
|
|
|
# Must have a conjunction
|
|
if not _COMPOUND_VERB_RE.search(action_lower):
|
|
return False
|
|
|
|
# Split by conjunctions and check if we get 2+ meaningful verbs
|
|
parts = re.split(r"\b(?:und|sowie|als\s+auch|or|and)\b", action_lower)
|
|
meaningful = [p.strip() for p in parts if len(p.strip()) >= 3]
|
|
return len(meaningful) >= 2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Framework Decomposition
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class DecomposedObligation:
|
|
"""A concrete obligation derived from a framework container."""
|
|
obligation_candidate_id: str
|
|
parent_control_id: str
|
|
parent_framework_container_id: str
|
|
source_ref_law: str
|
|
source_ref_article: str
|
|
obligation_text: str
|
|
actor: str
|
|
action_raw: str
|
|
object_raw: str
|
|
condition_raw: Optional[str] = None
|
|
trigger_raw: Optional[str] = None
|
|
routing_type: str = "atomic"
|
|
release_state: str = "decomposed"
|
|
subcontrol_id: str = ""
|
|
# Metadata
|
|
action_hint: str = ""
|
|
object_hint: str = ""
|
|
object_class: str = ""
|
|
keywords: list[str] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class FrameworkDecompositionResult:
|
|
"""Result of framework decomposition."""
|
|
framework_container_id: str
|
|
source_obligation_candidate_id: str
|
|
framework_ref: Optional[str]
|
|
framework_domain: Optional[str]
|
|
domain_title: Optional[str]
|
|
matched_subcontrols: list[str]
|
|
decomposition_confidence: float
|
|
release_state: str # decomposed | unmatched | error
|
|
decomposed_obligations: list[DecomposedObligation]
|
|
issues: list[str]
|
|
|
|
|
|
def decompose_framework_container(
|
|
obligation_candidate_id: str,
|
|
parent_control_id: str,
|
|
obligation_text: str,
|
|
framework_ref: Optional[str],
|
|
framework_domain: Optional[str],
|
|
actor: str = "organization",
|
|
) -> FrameworkDecompositionResult:
|
|
"""Decompose a framework-container obligation into concrete sub-obligations.
|
|
|
|
Steps:
|
|
1. Resolve framework from registry
|
|
2. Resolve domain within framework
|
|
3. Select relevant subcontrols (keyword filter or full domain)
|
|
4. Generate decomposed obligations
|
|
"""
|
|
container_id = f"FWC-{uuid.uuid4().hex[:8]}"
|
|
registry = get_registry()
|
|
issues: list[str] = []
|
|
|
|
# Step 1: Resolve framework
|
|
fw = None
|
|
if framework_ref and framework_ref in registry:
|
|
fw = registry[framework_ref]
|
|
else:
|
|
# Try to find by name in text
|
|
fw, framework_ref = _find_framework_in_text(obligation_text, registry)
|
|
|
|
if not fw:
|
|
issues.append("ERROR: framework_not_matched")
|
|
return FrameworkDecompositionResult(
|
|
framework_container_id=container_id,
|
|
source_obligation_candidate_id=obligation_candidate_id,
|
|
framework_ref=framework_ref,
|
|
framework_domain=framework_domain,
|
|
domain_title=None,
|
|
matched_subcontrols=[],
|
|
decomposition_confidence=0.0,
|
|
release_state="unmatched",
|
|
decomposed_obligations=[],
|
|
issues=issues,
|
|
)
|
|
|
|
# Step 2: Resolve domain
|
|
domain_data = None
|
|
domain_title = None
|
|
if framework_domain:
|
|
for d in fw.get("domains", []):
|
|
if d["domain_id"].lower() == framework_domain.lower():
|
|
domain_data = d
|
|
domain_title = d.get("title")
|
|
break
|
|
if not domain_data:
|
|
# Try matching from text
|
|
domain_id, domain_title = _match_domain(obligation_text, fw)
|
|
if domain_id:
|
|
for d in fw.get("domains", []):
|
|
if d["domain_id"] == domain_id:
|
|
domain_data = d
|
|
framework_domain = domain_id
|
|
break
|
|
|
|
if not domain_data:
|
|
issues.append("WARN: domain_not_matched — using all domains")
|
|
# Fall back to all subcontrols across all domains
|
|
all_subcontrols = []
|
|
for d in fw.get("domains", []):
|
|
for sc in d.get("subcontrols", []):
|
|
sc["_domain_id"] = d["domain_id"]
|
|
all_subcontrols.append(sc)
|
|
subcontrols = _select_subcontrols(obligation_text, all_subcontrols)
|
|
if not subcontrols:
|
|
issues.append("ERROR: no_subcontrols_matched")
|
|
return FrameworkDecompositionResult(
|
|
framework_container_id=container_id,
|
|
source_obligation_candidate_id=obligation_candidate_id,
|
|
framework_ref=framework_ref,
|
|
framework_domain=framework_domain,
|
|
domain_title=None,
|
|
matched_subcontrols=[],
|
|
decomposition_confidence=0.0,
|
|
release_state="unmatched",
|
|
decomposed_obligations=[],
|
|
issues=issues,
|
|
)
|
|
else:
|
|
# Step 3: Select subcontrols from domain
|
|
raw_subcontrols = domain_data.get("subcontrols", [])
|
|
subcontrols = _select_subcontrols(obligation_text, raw_subcontrols)
|
|
if not subcontrols:
|
|
# Full domain decomposition
|
|
subcontrols = raw_subcontrols
|
|
|
|
# Quality check: too many subcontrols
|
|
if len(subcontrols) > 25:
|
|
issues.append(f"WARN: {len(subcontrols)} subcontrols — may be too broad")
|
|
|
|
# Step 4: Generate decomposed obligations
|
|
display_name = fw.get("display_name", framework_ref or "Unknown")
|
|
decomposed: list[DecomposedObligation] = []
|
|
matched_ids: list[str] = []
|
|
|
|
for sc in subcontrols:
|
|
sc_id = sc.get("subcontrol_id", "")
|
|
matched_ids.append(sc_id)
|
|
|
|
action_hint = sc.get("action_hint", "")
|
|
object_hint = sc.get("object_hint", "")
|
|
|
|
# Quality warnings
|
|
if not action_hint:
|
|
issues.append(f"WARN: {sc_id} missing action_hint")
|
|
if not object_hint:
|
|
issues.append(f"WARN: {sc_id} missing object_hint")
|
|
|
|
obl_id = f"{obligation_candidate_id}-{sc_id}"
|
|
|
|
decomposed.append(DecomposedObligation(
|
|
obligation_candidate_id=obl_id,
|
|
parent_control_id=parent_control_id,
|
|
parent_framework_container_id=container_id,
|
|
source_ref_law=display_name,
|
|
source_ref_article=sc_id,
|
|
obligation_text=sc.get("statement", ""),
|
|
actor=actor,
|
|
action_raw=action_hint or _infer_action(sc.get("statement", "")),
|
|
object_raw=object_hint or _infer_object(sc.get("statement", "")),
|
|
routing_type="atomic",
|
|
release_state="decomposed",
|
|
subcontrol_id=sc_id,
|
|
action_hint=action_hint,
|
|
object_hint=object_hint,
|
|
object_class=sc.get("object_class", ""),
|
|
keywords=sc.get("keywords", []),
|
|
))
|
|
|
|
# Check if decomposed are identical to container
|
|
for d in decomposed:
|
|
if d.obligation_text.strip() == obligation_text.strip():
|
|
issues.append(f"WARN: {d.subcontrol_id} identical to container text")
|
|
|
|
confidence = _compute_decomposition_confidence(
|
|
framework_ref, framework_domain, domain_data, len(subcontrols), issues,
|
|
)
|
|
|
|
return FrameworkDecompositionResult(
|
|
framework_container_id=container_id,
|
|
source_obligation_candidate_id=obligation_candidate_id,
|
|
framework_ref=framework_ref,
|
|
framework_domain=framework_domain,
|
|
domain_title=domain_title,
|
|
matched_subcontrols=matched_ids,
|
|
decomposition_confidence=confidence,
|
|
release_state="decomposed",
|
|
decomposed_obligations=decomposed,
|
|
issues=issues,
|
|
)
|
|
|
|
|
|
def _find_framework_in_text(
|
|
text: str, registry: dict[str, dict],
|
|
) -> tuple[Optional[dict], Optional[str]]:
|
|
"""Try to find a framework by searching text for known names."""
|
|
alias_idx = _build_alias_index(registry)
|
|
m = _DIRECT_FRAMEWORK_RE.search(text)
|
|
if m:
|
|
fw_id = _resolve_framework_id(m.group(0), alias_idx, registry)
|
|
if fw_id and fw_id in registry:
|
|
return registry[fw_id], fw_id
|
|
return None, None
|
|
|
|
|
|
def _select_subcontrols(
|
|
obligation_text: str, subcontrols: list[dict],
|
|
) -> list[dict]:
|
|
"""Select relevant subcontrols based on keyword matching.
|
|
|
|
Returns empty list if no targeted match found (caller falls back to
|
|
full domain).
|
|
"""
|
|
text_lower = obligation_text.lower()
|
|
scored: list[tuple[int, dict]] = []
|
|
|
|
for sc in subcontrols:
|
|
score = 0
|
|
for kw in sc.get("keywords", []):
|
|
if kw.lower() in text_lower:
|
|
score += 1
|
|
# Title match
|
|
title = sc.get("title", "").lower()
|
|
if title and title in text_lower:
|
|
score += 3
|
|
# Object hint in text
|
|
obj = sc.get("object_hint", "").lower()
|
|
if obj and obj in text_lower:
|
|
score += 2
|
|
|
|
if score > 0:
|
|
scored.append((score, sc))
|
|
|
|
if not scored:
|
|
return []
|
|
|
|
# Only return those with meaningful overlap (score >= 2)
|
|
scored.sort(key=lambda x: x[0], reverse=True)
|
|
return [sc for score, sc in scored if score >= 2]
|
|
|
|
|
|
def _infer_action(statement: str) -> str:
|
|
"""Infer a basic action verb from a statement."""
|
|
s = statement.lower()
|
|
if any(w in s for w in ["definiert", "definieren", "define"]):
|
|
return "definieren"
|
|
if any(w in s for w in ["implementiert", "implementieren", "implement"]):
|
|
return "implementieren"
|
|
if any(w in s for w in ["dokumentiert", "dokumentieren", "document"]):
|
|
return "dokumentieren"
|
|
if any(w in s for w in ["ueberwacht", "ueberwachen", "monitor"]):
|
|
return "ueberwachen"
|
|
if any(w in s for w in ["getestet", "testen", "test"]):
|
|
return "testen"
|
|
if any(w in s for w in ["geschuetzt", "schuetzen", "protect"]):
|
|
return "implementieren"
|
|
if any(w in s for w in ["verwaltet", "verwalten", "manage"]):
|
|
return "pflegen"
|
|
if any(w in s for w in ["gemeldet", "melden", "report"]):
|
|
return "melden"
|
|
return "implementieren"
|
|
|
|
|
|
def _infer_object(statement: str) -> str:
|
|
"""Infer the primary object from a statement (first noun phrase)."""
|
|
# Simple heuristic: take the text after "muessen"/"muss" up to the verb
|
|
m = re.search(
|
|
r"(?:muessen|muss|m(?:ü|ue)ssen)\s+(.+?)(?:\s+werden|\s+sein|\.|,|$)",
|
|
statement,
|
|
re.IGNORECASE,
|
|
)
|
|
if m:
|
|
return m.group(1).strip()[:80]
|
|
# Fallback: first 80 chars
|
|
return statement[:80] if statement else ""
|
|
|
|
|
|
def _compute_decomposition_confidence(
|
|
framework_ref: Optional[str],
|
|
domain: Optional[str],
|
|
domain_data: Optional[dict],
|
|
num_subcontrols: int,
|
|
issues: list[str],
|
|
) -> float:
|
|
"""Compute confidence score for the decomposition."""
|
|
score = 0.3
|
|
if framework_ref:
|
|
score += 0.25
|
|
if domain:
|
|
score += 0.20
|
|
if domain_data:
|
|
score += 0.10
|
|
if 1 <= num_subcontrols <= 15:
|
|
score += 0.10
|
|
elif num_subcontrols > 15:
|
|
score += 0.05 # less confident with too many
|
|
|
|
# Penalize errors
|
|
errors = sum(1 for i in issues if i.startswith("ERROR:"))
|
|
score -= errors * 0.15
|
|
return round(max(min(score, 1.0), 0.0), 2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Registry statistics (for admin/debugging)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def registry_stats() -> dict:
|
|
"""Return summary statistics about the loaded registry."""
|
|
reg = get_registry()
|
|
stats = {
|
|
"frameworks": len(reg),
|
|
"details": [],
|
|
}
|
|
total_domains = 0
|
|
total_subcontrols = 0
|
|
for fw_id, fw in reg.items():
|
|
domains = fw.get("domains", [])
|
|
n_sc = sum(len(d.get("subcontrols", [])) for d in domains)
|
|
total_domains += len(domains)
|
|
total_subcontrols += n_sc
|
|
stats["details"].append({
|
|
"framework_id": fw_id,
|
|
"display_name": fw.get("display_name", ""),
|
|
"domains": len(domains),
|
|
"subcontrols": n_sc,
|
|
})
|
|
stats["total_domains"] = total_domains
|
|
stats["total_subcontrols"] = total_subcontrols
|
|
return stats
|