"""Framework Decomposition Engine — decomposes framework-container obligations. Sits between Pass 0a (obligation extraction) and Pass 0b (atomic control composition). Detects obligations that reference a framework domain (e.g. "CCM-Praktiken fuer AIS") and decomposes them into concrete sub-obligations using an internal framework registry. Three routing types: atomic → pass through to Pass 0b unchanged compound → split compound verbs, then Pass 0b framework_container → decompose via registry, then Pass 0b The registry is a set of JSON files under compliance/data/frameworks/. """ import json import logging import os import re import uuid from dataclasses import dataclass, field from pathlib import Path from typing import Optional logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Registry loading # --------------------------------------------------------------------------- _REGISTRY_DIR = Path(__file__).resolve().parent.parent / "data" / "frameworks" _REGISTRY: dict[str, dict] = {} # framework_id → framework dict def _load_registry() -> dict[str, dict]: """Load all framework JSON files from the registry directory.""" registry: dict[str, dict] = {} if not _REGISTRY_DIR.is_dir(): logger.warning("Framework registry dir not found: %s", _REGISTRY_DIR) return registry for fpath in sorted(_REGISTRY_DIR.glob("*.json")): try: with open(fpath, encoding="utf-8") as f: fw = json.load(f) fw_id = fw.get("framework_id", fpath.stem) registry[fw_id] = fw logger.info( "Loaded framework: %s (%d domains)", fw_id, len(fw.get("domains", [])), ) except Exception: logger.exception("Failed to load framework file: %s", fpath) return registry def get_registry() -> dict[str, dict]: """Return the global framework registry (lazy-loaded).""" global _REGISTRY if not _REGISTRY: _REGISTRY = _load_registry() return _REGISTRY def reload_registry() -> dict[str, dict]: """Force-reload the framework registry from disk.""" global _REGISTRY _REGISTRY = _load_registry() return _REGISTRY # --------------------------------------------------------------------------- # Framework alias index (built from registry) # --------------------------------------------------------------------------- def _build_alias_index(registry: dict[str, dict]) -> dict[str, str]: """Build a lowercase alias → framework_id lookup.""" idx: dict[str, str] = {} for fw_id, fw in registry.items(): # Framework-level aliases idx[fw_id.lower()] = fw_id name = fw.get("display_name", "") if name: idx[name.lower()] = fw_id # Common short forms for part in fw_id.lower().replace("_", " ").split(): if len(part) >= 3: idx[part] = fw_id return idx # --------------------------------------------------------------------------- # Routing — classify obligation type # --------------------------------------------------------------------------- # Extended patterns for framework detection (beyond the simple _COMPOSITE_RE # in decomposition_pass.py — here we also capture the framework name) _FRAMEWORK_PATTERN = re.compile( r"(?:praktiken|kontrollen|ma(?:ss|ß)nahmen|anforderungen|vorgaben|controls|practices|measures|requirements)" r"\s+(?:f(?:ue|ü)r|aus|gem(?:ae|ä)(?:ss|ß)|nach|from|of|for|per)\s+" r"(.+?)(?:\s+(?:m(?:ue|ü)ssen|sollen|sind|werden|implementieren|umsetzen|einf(?:ue|ü)hren)|\.|,|$)", re.IGNORECASE, ) # Direct framework name references _DIRECT_FRAMEWORK_RE = re.compile( r"\b(?:CSA\s*CCM|NIST\s*(?:SP\s*)?800-53|OWASP\s*(?:ASVS|SAMM|Top\s*10)" r"|CIS\s*Controls|BSI\s*(?:IT-)?Grundschutz|ENISA|ISO\s*2700[12]" r"|COBIT|SOX|PCI\s*DSS|HITRUST|SOC\s*2|KRITIS)\b", re.IGNORECASE, ) # Compound verb patterns (multiple main verbs) _COMPOUND_VERB_RE = re.compile( r"\b(?:und|sowie|als\s+auch|or|and)\b", re.IGNORECASE, ) # No-split phrases that look compound but aren't _NO_SPLIT_PHRASES = [ "pflegen und aufrechterhalten", "dokumentieren und pflegen", "definieren und dokumentieren", "erstellen und freigeben", "pruefen und genehmigen", "identifizieren und bewerten", "erkennen und melden", "define and maintain", "create and maintain", "establish and maintain", "monitor and review", "detect and respond", ] @dataclass class RoutingResult: """Result of obligation routing classification.""" routing_type: str # atomic | compound | framework_container | unknown_review framework_ref: Optional[str] = None framework_domain: Optional[str] = None domain_title: Optional[str] = None confidence: float = 0.0 reason: str = "" def classify_routing( obligation_text: str, action_raw: str, object_raw: str, condition_raw: Optional[str] = None, ) -> RoutingResult: """Classify an obligation into atomic / compound / framework_container.""" combined = f"{obligation_text} {object_raw}".lower() # --- Step 1: Framework container detection --- fw_result = _detect_framework(obligation_text, object_raw) if fw_result.routing_type == "framework_container": return fw_result # --- Step 2: Compound verb detection --- if _is_compound_obligation(action_raw, obligation_text): return RoutingResult( routing_type="compound", confidence=0.7, reason="multiple_main_verbs", ) # --- Step 3: Default = atomic --- return RoutingResult( routing_type="atomic", confidence=0.9, reason="single_action_single_object", ) def _detect_framework( obligation_text: str, object_raw: str, ) -> RoutingResult: """Detect if obligation references a framework domain.""" combined = f"{obligation_text} {object_raw}" registry = get_registry() alias_idx = _build_alias_index(registry) # Strategy 1: direct framework name match m = _DIRECT_FRAMEWORK_RE.search(combined) if m: fw_name = m.group(0).strip() fw_id = _resolve_framework_id(fw_name, alias_idx, registry) if fw_id: domain_id, domain_title = _match_domain( combined, registry[fw_id], ) return RoutingResult( routing_type="framework_container", framework_ref=fw_id, framework_domain=domain_id, domain_title=domain_title, confidence=0.95 if domain_id else 0.75, reason=f"direct_framework_match:{fw_name}", ) else: # Framework name recognized but not in registry return RoutingResult( routing_type="framework_container", framework_ref=None, framework_domain=None, confidence=0.6, reason=f"direct_framework_match_no_registry:{fw_name}", ) # Strategy 2: pattern match ("Praktiken fuer X") m2 = _FRAMEWORK_PATTERN.search(combined) if m2: ref_text = m2.group(1).strip() fw_id, domain_id, domain_title = _resolve_from_ref_text( ref_text, registry, alias_idx, ) if fw_id: return RoutingResult( routing_type="framework_container", framework_ref=fw_id, framework_domain=domain_id, domain_title=domain_title, confidence=0.85 if domain_id else 0.65, reason=f"pattern_match:{ref_text}", ) # Strategy 3: keyword-heavy object if _has_framework_keywords(object_raw): return RoutingResult( routing_type="framework_container", framework_ref=None, framework_domain=None, confidence=0.5, reason="framework_keywords_in_object", ) return RoutingResult(routing_type="atomic", confidence=0.0) def _resolve_framework_id( name: str, alias_idx: dict[str, str], registry: dict[str, dict], ) -> Optional[str]: """Resolve a framework name to its registry ID.""" normalized = re.sub(r"\s+", " ", name.strip().lower()) # Direct alias match if normalized in alias_idx: return alias_idx[normalized] # Try compact form (strip spaces, hyphens, underscores) compact = re.sub(r"[\s_\-]+", "", normalized) for alias, fw_id in alias_idx.items(): if re.sub(r"[\s_\-]+", "", alias) == compact: return fw_id # Substring match in display names for fw_id, fw in registry.items(): display = fw.get("display_name", "").lower() if normalized in display or display in normalized: return fw_id # Partial match: check if normalized contains any alias (for multi-word refs) for alias, fw_id in alias_idx.items(): if len(alias) >= 4 and alias in normalized: return fw_id return None def _match_domain( text: str, framework: dict, ) -> tuple[Optional[str], Optional[str]]: """Match a domain within a framework from text references.""" text_lower = text.lower() best_id: Optional[str] = None best_title: Optional[str] = None best_score = 0 for domain in framework.get("domains", []): score = 0 domain_id = domain["domain_id"] title = domain.get("title", "") # Exact domain ID match (e.g. "AIS") if re.search(rf"\b{re.escape(domain_id)}\b", text, re.IGNORECASE): score += 10 # Full title match if title.lower() in text_lower: score += 8 # Alias match for alias in domain.get("aliases", []): if alias.lower() in text_lower: score += 6 break # Keyword overlap kw_hits = sum( 1 for kw in domain.get("keywords", []) if kw.lower() in text_lower ) score += kw_hits if score > best_score: best_score = score best_id = domain_id best_title = title if best_score >= 3: return best_id, best_title return None, None def _resolve_from_ref_text( ref_text: str, registry: dict[str, dict], alias_idx: dict[str, str], ) -> tuple[Optional[str], Optional[str], Optional[str]]: """Resolve framework + domain from a reference text like 'AIS' or 'Application Security'.""" ref_lower = ref_text.lower() for fw_id, fw in registry.items(): for domain in fw.get("domains", []): # Check domain ID if domain["domain_id"].lower() in ref_lower: return fw_id, domain["domain_id"], domain.get("title") # Check title if domain.get("title", "").lower() in ref_lower: return fw_id, domain["domain_id"], domain.get("title") # Check aliases for alias in domain.get("aliases", []): if alias.lower() in ref_lower or ref_lower in alias.lower(): return fw_id, domain["domain_id"], domain.get("title") return None, None, None _FRAMEWORK_KW_SET = { "praktiken", "kontrollen", "massnahmen", "maßnahmen", "anforderungen", "vorgaben", "framework", "standard", "baseline", "katalog", "domain", "family", "category", "practices", "controls", "measures", "requirements", } def _has_framework_keywords(text: str) -> bool: """Check if text contains framework-indicator keywords.""" words = set(re.findall(r"[a-zäöüß]+", text.lower())) return len(words & _FRAMEWORK_KW_SET) >= 2 def _is_compound_obligation(action_raw: str, obligation_text: str) -> bool: """Detect if the obligation has multiple competing main verbs.""" if not action_raw: return False action_lower = action_raw.lower().strip() # Check no-split phrases first for phrase in _NO_SPLIT_PHRASES: if phrase in action_lower: return False # Must have a conjunction if not _COMPOUND_VERB_RE.search(action_lower): return False # Split by conjunctions and check if we get 2+ meaningful verbs parts = re.split(r"\b(?:und|sowie|als\s+auch|or|and)\b", action_lower) meaningful = [p.strip() for p in parts if len(p.strip()) >= 3] return len(meaningful) >= 2 # --------------------------------------------------------------------------- # Framework Decomposition # --------------------------------------------------------------------------- @dataclass class DecomposedObligation: """A concrete obligation derived from a framework container.""" obligation_candidate_id: str parent_control_id: str parent_framework_container_id: str source_ref_law: str source_ref_article: str obligation_text: str actor: str action_raw: str object_raw: str condition_raw: Optional[str] = None trigger_raw: Optional[str] = None routing_type: str = "atomic" release_state: str = "decomposed" subcontrol_id: str = "" # Metadata action_hint: str = "" object_hint: str = "" object_class: str = "" keywords: list[str] = field(default_factory=list) @dataclass class FrameworkDecompositionResult: """Result of framework decomposition.""" framework_container_id: str source_obligation_candidate_id: str framework_ref: Optional[str] framework_domain: Optional[str] domain_title: Optional[str] matched_subcontrols: list[str] decomposition_confidence: float release_state: str # decomposed | unmatched | error decomposed_obligations: list[DecomposedObligation] issues: list[str] def decompose_framework_container( obligation_candidate_id: str, parent_control_id: str, obligation_text: str, framework_ref: Optional[str], framework_domain: Optional[str], actor: str = "organization", ) -> FrameworkDecompositionResult: """Decompose a framework-container obligation into concrete sub-obligations. Steps: 1. Resolve framework from registry 2. Resolve domain within framework 3. Select relevant subcontrols (keyword filter or full domain) 4. Generate decomposed obligations """ container_id = f"FWC-{uuid.uuid4().hex[:8]}" registry = get_registry() issues: list[str] = [] # Step 1: Resolve framework fw = None if framework_ref and framework_ref in registry: fw = registry[framework_ref] else: # Try to find by name in text fw, framework_ref = _find_framework_in_text(obligation_text, registry) if not fw: issues.append("ERROR: framework_not_matched") return FrameworkDecompositionResult( framework_container_id=container_id, source_obligation_candidate_id=obligation_candidate_id, framework_ref=framework_ref, framework_domain=framework_domain, domain_title=None, matched_subcontrols=[], decomposition_confidence=0.0, release_state="unmatched", decomposed_obligations=[], issues=issues, ) # Step 2: Resolve domain domain_data = None domain_title = None if framework_domain: for d in fw.get("domains", []): if d["domain_id"].lower() == framework_domain.lower(): domain_data = d domain_title = d.get("title") break if not domain_data: # Try matching from text domain_id, domain_title = _match_domain(obligation_text, fw) if domain_id: for d in fw.get("domains", []): if d["domain_id"] == domain_id: domain_data = d framework_domain = domain_id break if not domain_data: issues.append("WARN: domain_not_matched — using all domains") # Fall back to all subcontrols across all domains all_subcontrols = [] for d in fw.get("domains", []): for sc in d.get("subcontrols", []): sc["_domain_id"] = d["domain_id"] all_subcontrols.append(sc) subcontrols = _select_subcontrols(obligation_text, all_subcontrols) if not subcontrols: issues.append("ERROR: no_subcontrols_matched") return FrameworkDecompositionResult( framework_container_id=container_id, source_obligation_candidate_id=obligation_candidate_id, framework_ref=framework_ref, framework_domain=framework_domain, domain_title=None, matched_subcontrols=[], decomposition_confidence=0.0, release_state="unmatched", decomposed_obligations=[], issues=issues, ) else: # Step 3: Select subcontrols from domain raw_subcontrols = domain_data.get("subcontrols", []) subcontrols = _select_subcontrols(obligation_text, raw_subcontrols) if not subcontrols: # Full domain decomposition subcontrols = raw_subcontrols # Quality check: too many subcontrols if len(subcontrols) > 25: issues.append(f"WARN: {len(subcontrols)} subcontrols — may be too broad") # Step 4: Generate decomposed obligations display_name = fw.get("display_name", framework_ref or "Unknown") decomposed: list[DecomposedObligation] = [] matched_ids: list[str] = [] for sc in subcontrols: sc_id = sc.get("subcontrol_id", "") matched_ids.append(sc_id) action_hint = sc.get("action_hint", "") object_hint = sc.get("object_hint", "") # Quality warnings if not action_hint: issues.append(f"WARN: {sc_id} missing action_hint") if not object_hint: issues.append(f"WARN: {sc_id} missing object_hint") obl_id = f"{obligation_candidate_id}-{sc_id}" decomposed.append(DecomposedObligation( obligation_candidate_id=obl_id, parent_control_id=parent_control_id, parent_framework_container_id=container_id, source_ref_law=display_name, source_ref_article=sc_id, obligation_text=sc.get("statement", ""), actor=actor, action_raw=action_hint or _infer_action(sc.get("statement", "")), object_raw=object_hint or _infer_object(sc.get("statement", "")), routing_type="atomic", release_state="decomposed", subcontrol_id=sc_id, action_hint=action_hint, object_hint=object_hint, object_class=sc.get("object_class", ""), keywords=sc.get("keywords", []), )) # Check if decomposed are identical to container for d in decomposed: if d.obligation_text.strip() == obligation_text.strip(): issues.append(f"WARN: {d.subcontrol_id} identical to container text") confidence = _compute_decomposition_confidence( framework_ref, framework_domain, domain_data, len(subcontrols), issues, ) return FrameworkDecompositionResult( framework_container_id=container_id, source_obligation_candidate_id=obligation_candidate_id, framework_ref=framework_ref, framework_domain=framework_domain, domain_title=domain_title, matched_subcontrols=matched_ids, decomposition_confidence=confidence, release_state="decomposed", decomposed_obligations=decomposed, issues=issues, ) def _find_framework_in_text( text: str, registry: dict[str, dict], ) -> tuple[Optional[dict], Optional[str]]: """Try to find a framework by searching text for known names.""" alias_idx = _build_alias_index(registry) m = _DIRECT_FRAMEWORK_RE.search(text) if m: fw_id = _resolve_framework_id(m.group(0), alias_idx, registry) if fw_id and fw_id in registry: return registry[fw_id], fw_id return None, None def _select_subcontrols( obligation_text: str, subcontrols: list[dict], ) -> list[dict]: """Select relevant subcontrols based on keyword matching. Returns empty list if no targeted match found (caller falls back to full domain). """ text_lower = obligation_text.lower() scored: list[tuple[int, dict]] = [] for sc in subcontrols: score = 0 for kw in sc.get("keywords", []): if kw.lower() in text_lower: score += 1 # Title match title = sc.get("title", "").lower() if title and title in text_lower: score += 3 # Object hint in text obj = sc.get("object_hint", "").lower() if obj and obj in text_lower: score += 2 if score > 0: scored.append((score, sc)) if not scored: return [] # Only return those with meaningful overlap (score >= 2) scored.sort(key=lambda x: x[0], reverse=True) return [sc for score, sc in scored if score >= 2] def _infer_action(statement: str) -> str: """Infer a basic action verb from a statement.""" s = statement.lower() if any(w in s for w in ["definiert", "definieren", "define"]): return "definieren" if any(w in s for w in ["implementiert", "implementieren", "implement"]): return "implementieren" if any(w in s for w in ["dokumentiert", "dokumentieren", "document"]): return "dokumentieren" if any(w in s for w in ["ueberwacht", "ueberwachen", "monitor"]): return "ueberwachen" if any(w in s for w in ["getestet", "testen", "test"]): return "testen" if any(w in s for w in ["geschuetzt", "schuetzen", "protect"]): return "implementieren" if any(w in s for w in ["verwaltet", "verwalten", "manage"]): return "pflegen" if any(w in s for w in ["gemeldet", "melden", "report"]): return "melden" return "implementieren" def _infer_object(statement: str) -> str: """Infer the primary object from a statement (first noun phrase).""" # Simple heuristic: take the text after "muessen"/"muss" up to the verb m = re.search( r"(?:muessen|muss|m(?:ü|ue)ssen)\s+(.+?)(?:\s+werden|\s+sein|\.|,|$)", statement, re.IGNORECASE, ) if m: return m.group(1).strip()[:80] # Fallback: first 80 chars return statement[:80] if statement else "" def _compute_decomposition_confidence( framework_ref: Optional[str], domain: Optional[str], domain_data: Optional[dict], num_subcontrols: int, issues: list[str], ) -> float: """Compute confidence score for the decomposition.""" score = 0.3 if framework_ref: score += 0.25 if domain: score += 0.20 if domain_data: score += 0.10 if 1 <= num_subcontrols <= 15: score += 0.10 elif num_subcontrols > 15: score += 0.05 # less confident with too many # Penalize errors errors = sum(1 for i in issues if i.startswith("ERROR:")) score -= errors * 0.15 return round(max(min(score, 1.0), 0.0), 2) # --------------------------------------------------------------------------- # Registry statistics (for admin/debugging) # --------------------------------------------------------------------------- def registry_stats() -> dict: """Return summary statistics about the loaded registry.""" reg = get_registry() stats = { "frameworks": len(reg), "details": [], } total_domains = 0 total_subcontrols = 0 for fw_id, fw in reg.items(): domains = fw.get("domains", []) n_sc = sum(len(d.get("subcontrols", [])) for d in domains) total_domains += len(domains) total_subcontrols += n_sc stats["details"].append({ "framework_id": fw_id, "display_name": fw.get("display_name", ""), "domains": len(domains), "subcontrols": n_sc, }) stats["total_domains"] = total_domains stats["total_subcontrols"] = total_subcontrols return stats