""" Dependency Generator — automatic discovery of control dependencies. Three strategies: 1. Ontology-based: same normalized_object + phase sequence -> prerequisite 2. Pattern-based: known patterns (define->implement, implement->monitor, etc.) 3. Domain packs: YAML-defined rules for specific regulatory domains """ from __future__ import annotations import logging import os import re from collections import defaultdict from typing import Optional import yaml from services.dependency_engine import Dependency, DEFAULT_PRIORITIES logger = logging.getLogger(__name__) # ============================================================================ # PHASE ORDERING (imported from ontology) # ============================================================================ from services.control_ontology import PHASE_ORDER # ============================================================================ # PATTERN RULES # ============================================================================ PATTERN_RULES: list[dict] = [ { "name": "define_before_implement", "source_filter": {"action_type": "define"}, "target_filter": {"action_type": "implement"}, "match_on": "normalized_object", "dependency_type": "prerequisite", "condition": {}, "effect": {"set_status": "review_required"}, "priority": 50, }, { "name": "implement_before_monitor", "source_filter": {"action_type_in": ["implement", "configure", "enforce"]}, "target_filter": {"action_type_in": ["monitor", "review", "test"]}, "match_on": "normalized_object", "dependency_type": "prerequisite", "condition": {}, "effect": {"set_status": "review_required"}, "priority": 50, }, { "name": "define_before_enforce", "source_filter": {"action_type": "define"}, "target_filter": {"action_type": "enforce"}, "match_on": "normalized_object", "dependency_type": "prerequisite", "condition": {}, "effect": {"set_status": "review_required"}, "priority": 50, }, { "name": "implement_before_validate", "source_filter": {"action_type_in": ["implement", "configure"]}, "target_filter": {"action_type_in": ["validate", "verify"]}, "match_on": "normalized_object", "dependency_type": "prerequisite", "condition": {}, "effect": {"set_status": "review_required"}, "priority": 50, }, { "name": "train_before_review", "source_filter": {"action_type": "train"}, "target_filter": {"action_type_in": ["review", "assess"]}, "match_on": "normalized_object", "dependency_type": "prerequisite", "condition": {}, "effect": {"set_status": "review_required"}, "priority": 60, }, ] # ============================================================================ # HELPER: Parse merge_key into components # ============================================================================ def _parse_merge_key(merge_key: str) -> dict: """Parse 'action_type:normalized_object:phase[:asset_scope]' into components.""" parts = merge_key.split(":") result = { "action_type": parts[0] if len(parts) > 0 else "", "normalized_object": parts[1] if len(parts) > 1 else "", "phase": parts[2] if len(parts) > 2 else "", "asset_scope": parts[3] if len(parts) > 3 else "", } return result def _get_control_merge_key(control: dict) -> str: """Extract merge_key from a control dict (from generation_metadata or top-level).""" mk = control.get("merge_key", "") if not mk: meta = control.get("generation_metadata", {}) if isinstance(meta, str): try: import json meta = json.loads(meta) except (ValueError, TypeError): meta = {} mk = meta.get("merge_group_hint", "") return mk # ============================================================================ # ONTOLOGY-BASED GENERATOR # ============================================================================ def generate_ontology_dependencies(controls: list[dict]) -> list[Dependency]: """Generate prerequisite dependencies from lifecycle phase ordering. Rule: If two controls share the same normalized_object and control A's phase precedes control B's phase, then A is a prerequisite for B. Groups by normalized_object first (O(n) grouping, O(k^2) per group where k is typically 2-8). """ # Group controls by normalized_object groups: dict[str, list[dict]] = defaultdict(list) for ctrl in controls: mk = _get_control_merge_key(ctrl) if not mk: continue parsed = _parse_merge_key(mk) obj = parsed["normalized_object"] if obj: ctrl["_parsed_mk"] = parsed ctrl["_phase_order"] = PHASE_ORDER.get(parsed["phase"], 6) groups[obj].append(ctrl) dependencies: list[Dependency] = [] for obj, group in groups.items(): if len(group) < 2: continue # Sort by phase order group.sort(key=lambda c: c["_phase_order"]) # Create prerequisite edges between adjacent phases for i in range(len(group)): for j in range(i + 1, len(group)): a = group[i] b = group[j] if a["_phase_order"] < b["_phase_order"]: dep = Dependency( source_control_id=a.get("id", a.get("control_id", "")), target_control_id=b.get("id", b.get("control_id", "")), dependency_type="prerequisite", condition={}, effect={"set_status": "review_required"}, priority=DEFAULT_PRIORITIES["prerequisite"], generation_method="ontology", ) dependencies.append(dep) return dependencies # ============================================================================ # PATTERN-BASED GENERATOR # ============================================================================ def _matches_filter(control: dict, filter_: dict) -> bool: """Check if a control matches a pattern filter.""" parsed = control.get("_parsed_mk", {}) action = parsed.get("action_type", "") if "action_type" in filter_: if action != filter_["action_type"]: return False if "action_type_in" in filter_: if action not in filter_["action_type_in"]: return False return True def generate_pattern_dependencies( controls: list[dict], rules: Optional[list[dict]] = None, ) -> list[Dependency]: """Apply pattern rules to generate dependencies between controls.""" if rules is None: rules = PATTERN_RULES # Pre-parse merge keys for ctrl in controls: if "_parsed_mk" not in ctrl: mk = _get_control_merge_key(ctrl) if mk: ctrl["_parsed_mk"] = _parse_merge_key(mk) else: ctrl["_parsed_mk"] = {} dependencies: list[Dependency] = [] for rule in rules: sources = [c for c in controls if _matches_filter(c, rule["source_filter"])] targets = [c for c in controls if _matches_filter(c, rule["target_filter"])] match_on = rule.get("match_on") for src in sources: for tgt in targets: src_id = src.get("id", src.get("control_id", "")) tgt_id = tgt.get("id", tgt.get("control_id", "")) if src_id == tgt_id: continue if match_on == "normalized_object": src_obj = src.get("_parsed_mk", {}).get("normalized_object", "") tgt_obj = tgt.get("_parsed_mk", {}).get("normalized_object", "") if not src_obj or src_obj != tgt_obj: continue dep = Dependency( source_control_id=src_id, target_control_id=tgt_id, dependency_type=rule["dependency_type"], condition=rule.get("condition", {}), effect=rule.get("effect", {"set_status": "review_required"}), priority=rule.get("priority", 100), generation_method="pattern", ) dependencies.append(dep) return dependencies # ============================================================================ # DOMAIN PACK GENERATOR # ============================================================================ def load_domain_pack(path: str) -> dict: """Load a YAML domain pack.""" with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {} def _title_matches(title: str, patterns: list[str]) -> bool: """Check if a title contains any of the given patterns (case-insensitive).""" title_lower = title.lower() return any(p.lower() in title_lower for p in patterns) def generate_domain_dependencies( controls: list[dict], domain_pack_dir: str = "", ) -> list[Dependency]: """Apply all domain packs to generate domain-specific dependencies.""" if not domain_pack_dir: domain_pack_dir = os.path.join( os.path.dirname(os.path.dirname(__file__)), "data", "domain_packs" ) if not os.path.isdir(domain_pack_dir): return [] dependencies: list[Dependency] = [] for filename in sorted(os.listdir(domain_pack_dir)): if not filename.endswith((".yaml", ".yml")): continue pack = load_domain_pack(os.path.join(domain_pack_dir, filename)) rules = pack.get("rules", []) for rule in rules: src_match = rule.get("source_match", {}) tgt_match = rule.get("target_match", {}) src_title_patterns = src_match.get("title_contains", []) tgt_title_patterns = tgt_match.get("title_contains", []) sources = [ c for c in controls if src_title_patterns and _title_matches(c.get("title", ""), src_title_patterns) ] targets = [ c for c in controls if tgt_title_patterns and _title_matches(c.get("title", ""), tgt_title_patterns) ] for src in sources: for tgt in targets: src_id = src.get("id", src.get("control_id", "")) tgt_id = tgt.get("id", tgt.get("control_id", "")) if src_id == tgt_id: continue dep = Dependency( source_control_id=src_id, target_control_id=tgt_id, dependency_type=rule.get("dependency_type", "prerequisite"), condition=rule.get("condition", { "field": "source.status", "op": "==", "value": "pass", }), effect=rule.get("effect", {"set_status": "not_applicable"}), priority=rule.get("priority", DEFAULT_PRIORITIES.get( rule.get("dependency_type", "prerequisite"), 100 )), generation_method="domain_pack", ) dependencies.append(dep) return dependencies # ============================================================================ # TOP-LEVEL GENERATOR # ============================================================================ def generate_all_dependencies( controls: list[dict], enable_ontology: bool = True, enable_patterns: bool = True, enable_domain_packs: bool = True, domain_pack_dir: str = "", ) -> tuple[list[Dependency], dict]: """Run all generators and return deduplicated dependencies + stats.""" stats = { "ontology_generated": 0, "pattern_generated": 0, "domain_generated": 0, "total_before_dedup": 0, "total_unique": 0, "duplicates_removed": 0, } all_deps: list[Dependency] = [] if enable_ontology: onto_deps = generate_ontology_dependencies(controls) stats["ontology_generated"] = len(onto_deps) all_deps.extend(onto_deps) if enable_patterns: pat_deps = generate_pattern_dependencies(controls) stats["pattern_generated"] = len(pat_deps) all_deps.extend(pat_deps) if enable_domain_packs: dom_deps = generate_domain_dependencies(controls, domain_pack_dir) stats["domain_generated"] = len(dom_deps) all_deps.extend(dom_deps) stats["total_before_dedup"] = len(all_deps) # Deduplicate by (source, target, type) seen: set[tuple[str, str, str]] = set() unique: list[Dependency] = [] for dep in all_deps: key = (dep.source_control_id, dep.target_control_id, dep.dependency_type) if key not in seen: seen.add(key) unique.append(dep) stats["total_unique"] = len(unique) stats["duplicates_removed"] = stats["total_before_dedup"] - stats["total_unique"] return unique, stats