Files
breakpilot-core/control-pipeline/services/dependency_generator.py
Benjamin Admin 96b8f25747 fix(pipeline): use action_type-derived phase order in ontology generator
LLM merge_key phases (e.g. "submission") don't always match PHASE_ORDER
keys. Derive phase order from action_type via get_phase_order() instead.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-26 20:32:58 +02:00

383 lines
13 KiB
Python

"""
Dependency Generator — automatic discovery of control dependencies.
Three strategies:
1. Ontology-based: same normalized_object + phase sequence -> prerequisite
2. Pattern-based: known patterns (define->implement, implement->monitor, etc.)
3. Domain packs: YAML-defined rules for specific regulatory domains
"""
from __future__ import annotations
import logging
import os
import re
from collections import defaultdict
from typing import Optional
import yaml
from services.dependency_engine import Dependency, DEFAULT_PRIORITIES
logger = logging.getLogger(__name__)
# ============================================================================
# PHASE ORDERING (imported from ontology)
# ============================================================================
from services.control_ontology import PHASE_ORDER, get_phase, get_phase_order
# ============================================================================
# PATTERN RULES
# ============================================================================
PATTERN_RULES: list[dict] = [
{
"name": "define_before_implement",
"source_filter": {"action_type": "define"},
"target_filter": {"action_type": "implement"},
"match_on": "normalized_object",
"dependency_type": "prerequisite",
"condition": {},
"effect": {"set_status": "review_required"},
"priority": 50,
},
{
"name": "implement_before_monitor",
"source_filter": {"action_type_in": ["implement", "configure", "enforce"]},
"target_filter": {"action_type_in": ["monitor", "review", "test"]},
"match_on": "normalized_object",
"dependency_type": "prerequisite",
"condition": {},
"effect": {"set_status": "review_required"},
"priority": 50,
},
{
"name": "define_before_enforce",
"source_filter": {"action_type": "define"},
"target_filter": {"action_type": "enforce"},
"match_on": "normalized_object",
"dependency_type": "prerequisite",
"condition": {},
"effect": {"set_status": "review_required"},
"priority": 50,
},
{
"name": "implement_before_validate",
"source_filter": {"action_type_in": ["implement", "configure"]},
"target_filter": {"action_type_in": ["validate", "verify"]},
"match_on": "normalized_object",
"dependency_type": "prerequisite",
"condition": {},
"effect": {"set_status": "review_required"},
"priority": 50,
},
{
"name": "train_before_review",
"source_filter": {"action_type": "train"},
"target_filter": {"action_type_in": ["review", "assess"]},
"match_on": "normalized_object",
"dependency_type": "prerequisite",
"condition": {},
"effect": {"set_status": "review_required"},
"priority": 60,
},
]
# ============================================================================
# HELPER: Parse merge_key into components
# ============================================================================
def _parse_merge_key(merge_key: str) -> dict:
"""Parse 'action_type:normalized_object:phase[:asset_scope]' into components."""
parts = merge_key.split(":")
result = {
"action_type": parts[0] if len(parts) > 0 else "",
"normalized_object": parts[1] if len(parts) > 1 else "",
"phase": parts[2] if len(parts) > 2 else "",
"asset_scope": parts[3] if len(parts) > 3 else "",
}
return result
def _get_control_merge_key(control: dict) -> str:
"""Extract merge_key from a control dict (from generation_metadata or top-level)."""
mk = control.get("merge_key", "")
if not mk:
meta = control.get("generation_metadata", {})
if isinstance(meta, str):
try:
import json
meta = json.loads(meta)
except (ValueError, TypeError):
meta = {}
mk = meta.get("merge_group_hint", "")
return mk
# ============================================================================
# ONTOLOGY-BASED GENERATOR
# ============================================================================
def generate_ontology_dependencies(controls: list[dict]) -> list[Dependency]:
"""Generate prerequisite dependencies from lifecycle phase ordering.
Rule: If two controls share the same normalized_object and control A's
phase precedes control B's phase, then A is a prerequisite for B.
Groups by normalized_object first (O(n) grouping, O(k^2) per group
where k is typically 2-8).
"""
# Group controls by normalized_object
groups: dict[str, list[dict]] = defaultdict(list)
for ctrl in controls:
mk = _get_control_merge_key(ctrl)
if not mk:
continue
parsed = _parse_merge_key(mk)
obj = parsed["normalized_object"]
if obj:
ctrl["_parsed_mk"] = parsed
# Use action_type to derive phase order (more reliable than LLM phase name)
ctrl["_phase_order"] = get_phase_order(parsed["action_type"])
groups[obj].append(ctrl)
dependencies: list[Dependency] = []
for obj, group in groups.items():
if len(group) < 2:
continue
# Sort by phase order
group.sort(key=lambda c: c["_phase_order"])
# Create prerequisite edges between adjacent phases
for i in range(len(group)):
for j in range(i + 1, len(group)):
a = group[i]
b = group[j]
if a["_phase_order"] < b["_phase_order"]:
dep = Dependency(
source_control_id=a.get("id", a.get("control_id", "")),
target_control_id=b.get("id", b.get("control_id", "")),
dependency_type="prerequisite",
condition={},
effect={"set_status": "review_required"},
priority=DEFAULT_PRIORITIES["prerequisite"],
generation_method="ontology",
)
dependencies.append(dep)
return dependencies
# ============================================================================
# PATTERN-BASED GENERATOR
# ============================================================================
def _matches_filter(control: dict, filter_: dict) -> bool:
"""Check if a control matches a pattern filter."""
parsed = control.get("_parsed_mk", {})
action = parsed.get("action_type", "")
if "action_type" in filter_:
if action != filter_["action_type"]:
return False
if "action_type_in" in filter_:
if action not in filter_["action_type_in"]:
return False
return True
def generate_pattern_dependencies(
controls: list[dict],
rules: Optional[list[dict]] = None,
) -> list[Dependency]:
"""Apply pattern rules to generate dependencies between controls."""
if rules is None:
rules = PATTERN_RULES
# Pre-parse merge keys
for ctrl in controls:
if "_parsed_mk" not in ctrl:
mk = _get_control_merge_key(ctrl)
if mk:
ctrl["_parsed_mk"] = _parse_merge_key(mk)
else:
ctrl["_parsed_mk"] = {}
dependencies: list[Dependency] = []
for rule in rules:
sources = [c for c in controls if _matches_filter(c, rule["source_filter"])]
targets = [c for c in controls if _matches_filter(c, rule["target_filter"])]
match_on = rule.get("match_on")
for src in sources:
for tgt in targets:
src_id = src.get("id", src.get("control_id", ""))
tgt_id = tgt.get("id", tgt.get("control_id", ""))
if src_id == tgt_id:
continue
if match_on == "normalized_object":
src_obj = src.get("_parsed_mk", {}).get("normalized_object", "")
tgt_obj = tgt.get("_parsed_mk", {}).get("normalized_object", "")
if not src_obj or src_obj != tgt_obj:
continue
dep = Dependency(
source_control_id=src_id,
target_control_id=tgt_id,
dependency_type=rule["dependency_type"],
condition=rule.get("condition", {}),
effect=rule.get("effect", {"set_status": "review_required"}),
priority=rule.get("priority", 100),
generation_method="pattern",
)
dependencies.append(dep)
return dependencies
# ============================================================================
# DOMAIN PACK GENERATOR
# ============================================================================
def load_domain_pack(path: str) -> dict:
"""Load a YAML domain pack."""
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
def _title_matches(title: str, patterns: list[str]) -> bool:
"""Check if a title contains any of the given patterns (case-insensitive)."""
title_lower = title.lower()
return any(p.lower() in title_lower for p in patterns)
def generate_domain_dependencies(
controls: list[dict],
domain_pack_dir: str = "",
) -> list[Dependency]:
"""Apply all domain packs to generate domain-specific dependencies."""
if not domain_pack_dir:
domain_pack_dir = os.path.join(
os.path.dirname(os.path.dirname(__file__)), "data", "domain_packs"
)
if not os.path.isdir(domain_pack_dir):
return []
dependencies: list[Dependency] = []
for filename in sorted(os.listdir(domain_pack_dir)):
if not filename.endswith((".yaml", ".yml")):
continue
pack = load_domain_pack(os.path.join(domain_pack_dir, filename))
rules = pack.get("rules", [])
for rule in rules:
src_match = rule.get("source_match", {})
tgt_match = rule.get("target_match", {})
src_title_patterns = src_match.get("title_contains", [])
tgt_title_patterns = tgt_match.get("title_contains", [])
sources = [
c for c in controls
if src_title_patterns and _title_matches(c.get("title", ""), src_title_patterns)
]
targets = [
c for c in controls
if tgt_title_patterns and _title_matches(c.get("title", ""), tgt_title_patterns)
]
for src in sources:
for tgt in targets:
src_id = src.get("id", src.get("control_id", ""))
tgt_id = tgt.get("id", tgt.get("control_id", ""))
if src_id == tgt_id:
continue
dep = Dependency(
source_control_id=src_id,
target_control_id=tgt_id,
dependency_type=rule.get("dependency_type", "prerequisite"),
condition=rule.get("condition", {
"field": "source.status", "op": "==", "value": "pass",
}),
effect=rule.get("effect", {"set_status": "not_applicable"}),
priority=rule.get("priority", DEFAULT_PRIORITIES.get(
rule.get("dependency_type", "prerequisite"), 100
)),
generation_method="domain_pack",
)
dependencies.append(dep)
return dependencies
# ============================================================================
# TOP-LEVEL GENERATOR
# ============================================================================
def generate_all_dependencies(
controls: list[dict],
enable_ontology: bool = True,
enable_patterns: bool = True,
enable_domain_packs: bool = True,
domain_pack_dir: str = "",
) -> tuple[list[Dependency], dict]:
"""Run all generators and return deduplicated dependencies + stats."""
stats = {
"ontology_generated": 0,
"pattern_generated": 0,
"domain_generated": 0,
"total_before_dedup": 0,
"total_unique": 0,
"duplicates_removed": 0,
}
all_deps: list[Dependency] = []
if enable_ontology:
onto_deps = generate_ontology_dependencies(controls)
stats["ontology_generated"] = len(onto_deps)
all_deps.extend(onto_deps)
if enable_patterns:
pat_deps = generate_pattern_dependencies(controls)
stats["pattern_generated"] = len(pat_deps)
all_deps.extend(pat_deps)
if enable_domain_packs:
dom_deps = generate_domain_dependencies(controls, domain_pack_dir)
stats["domain_generated"] = len(dom_deps)
all_deps.extend(dom_deps)
stats["total_before_dedup"] = len(all_deps)
# Deduplicate by (source, target, type)
seen: set[tuple[str, str, str]] = set()
unique: list[Dependency] = []
for dep in all_deps:
key = (dep.source_control_id, dep.target_control_id, dep.dependency_type)
if key not in seen:
seen.add(key)
unique.append(dep)
stats["total_unique"] = len(unique)
stats["duplicates_removed"] = stats["total_before_dedup"] - stats["total_unique"]
return unique, stats