diff --git a/control-pipeline/api/__init__.py b/control-pipeline/api/__init__.py index 4a41927..5d23aae 100644 --- a/control-pipeline/api/__init__.py +++ b/control-pipeline/api/__init__.py @@ -3,8 +3,10 @@ from fastapi import APIRouter from api.control_generator_routes import router as generator_router from api.canonical_control_routes import router as canonical_router from api.document_compliance_routes import router as document_router +from api.dependency_routes import router as dependency_router router = APIRouter() router.include_router(generator_router) router.include_router(canonical_router) router.include_router(document_router) +router.include_router(dependency_router) diff --git a/control-pipeline/api/dependency_routes.py b/control-pipeline/api/dependency_routes.py new file mode 100644 index 0000000..7772685 --- /dev/null +++ b/control-pipeline/api/dependency_routes.py @@ -0,0 +1,448 @@ +""" +FastAPI routes for the Control Dependency Engine. + +Endpoints: + GET /v1/dependencies — List dependencies + POST /v1/dependencies — Create a dependency + DELETE /v1/dependencies/{dep_id} — Deactivate a dependency + POST /v1/dependencies/generate — Auto-generate dependencies + POST /v1/dependencies/evaluate — Evaluate controls with dependencies + GET /v1/dependencies/evaluate/{run_id} — Get evaluation results + POST /v1/dependencies/validate — Validate graph (cycle check) + GET /v1/dependencies/graph — Dependency graph for visualization +""" + +import json +import logging +import uuid +from typing import Optional + +from fastapi import APIRouter, HTTPException, Query +from pydantic import BaseModel +from sqlalchemy import text + +from db.session import SessionLocal +from services.dependency_engine import ( + Dependency, + ControlState, + EvaluationResult, + evaluate_controls, + detect_cycles, + load_all_active_dependencies, + load_dependencies_for_controls, + store_dependency, + store_evaluation_results, +) + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/v1/dependencies", tags=["dependencies"]) + + +# ============================================================================= +# REQUEST / RESPONSE MODELS +# ============================================================================= + +class DependencyCreateRequest(BaseModel): + source_control_id: str + target_control_id: str + dependency_type: str + condition: dict = {} + effect: dict = {} + priority: int = 100 + + +class EvaluateRequest(BaseModel): + control_ids: Optional[list] = None + company_profile: dict = {} + control_statuses: dict = {} + + +class GenerateRequest(BaseModel): + enable_ontology: bool = True + enable_patterns: bool = True + enable_domain_packs: bool = True + dry_run: bool = True + limit: int = 0 + + +# ============================================================================= +# LIST DEPENDENCIES +# ============================================================================= + +@router.get("/") +async def list_dependencies( + dependency_type: Optional[str] = Query(default=None), + source_id: Optional[str] = Query(default=None), + target_id: Optional[str] = Query(default=None), + active_only: bool = Query(default=True), + limit: int = Query(default=100, le=1000), + offset: int = Query(default=0), +): + """List dependencies with optional filters.""" + db = SessionLocal() + try: + conditions = [] + params: dict = {"lim": limit, "off": offset} + + if active_only: + conditions.append("is_active = TRUE") + if dependency_type: + conditions.append("dependency_type = :dtype") + params["dtype"] = dependency_type + if source_id: + conditions.append("source_control_id = CAST(:src AS uuid)") + params["src"] = source_id + if target_id: + conditions.append("target_control_id = CAST(:tgt AS uuid)") + params["tgt"] = target_id + + where = "WHERE " + " AND ".join(conditions) if conditions else "" + + rows = db.execute(text(f""" + SELECT d.id, d.source_control_id, d.target_control_id, + d.dependency_type, d.condition, d.effect, d.priority, + d.generation_method, d.is_active, d.created_at, + s.control_id AS source_cid, s.title AS source_title, + t.control_id AS target_cid, t.title AS target_title + FROM control_dependencies d + LEFT JOIN canonical_controls s ON s.id = d.source_control_id + LEFT JOIN canonical_controls t ON t.id = d.target_control_id + {where} + ORDER BY d.priority, d.created_at + LIMIT :lim OFFSET :off + """), params).fetchall() + + total = db.execute(text(f""" + SELECT COUNT(*) FROM control_dependencies d {where} + """), params).scalar() + + return { + "total": total, + "dependencies": [ + { + "id": str(r[0]), + "source_control_id": str(r[1]), + "target_control_id": str(r[2]), + "dependency_type": r[3], + "condition": r[4], + "effect": r[5], + "priority": r[6], + "generation_method": r[7], + "is_active": r[8], + "created_at": str(r[9]) if r[9] else None, + "source_control_id_short": r[10] or "", + "source_title": r[11] or "", + "target_control_id_short": r[12] or "", + "target_title": r[13] or "", + } + for r in rows + ], + } + finally: + db.close() + + +# ============================================================================= +# CREATE DEPENDENCY +# ============================================================================= + +@router.post("/") +async def create_dependency(req: DependencyCreateRequest): + """Create a manual dependency.""" + valid_types = {"prerequisite", "conditional_requirement", "supersedes", + "compensating_control", "scope_exclusion"} + if req.dependency_type not in valid_types: + raise HTTPException(400, f"Invalid type. Must be one of: {valid_types}") + + if req.source_control_id == req.target_control_id: + raise HTTPException(400, "Source and target must be different") + + db = SessionLocal() + try: + dep = Dependency( + source_control_id=req.source_control_id, + target_control_id=req.target_control_id, + dependency_type=req.dependency_type, + condition=req.condition, + effect=req.effect, + priority=req.priority, + generation_method="manual", + ) + dep_id = store_dependency(db, dep) + db.commit() + return {"id": dep_id, "status": "created"} + except Exception as e: + db.rollback() + raise HTTPException(500, str(e)) + finally: + db.close() + + +# ============================================================================= +# DELETE (DEACTIVATE) DEPENDENCY +# ============================================================================= + +@router.delete("/{dep_id}") +async def deactivate_dependency(dep_id: str): + """Deactivate a dependency (soft delete).""" + db = SessionLocal() + try: + result = db.execute( + text("UPDATE control_dependencies SET is_active = FALSE, updated_at = NOW() WHERE id = CAST(:did AS uuid)"), + {"did": dep_id}, + ) + db.commit() + if result.rowcount == 0: + raise HTTPException(404, "Dependency not found") + return {"status": "deactivated"} + finally: + db.close() + + +# ============================================================================= +# AUTO-GENERATE DEPENDENCIES +# ============================================================================= + +@router.post("/generate") +async def generate_dependencies(req: GenerateRequest): + """Auto-generate dependencies from ontology, patterns, and domain packs.""" + from services.dependency_generator import generate_all_dependencies + + db = SessionLocal() + try: + query = """ + SELECT id::text, control_id, title, generation_metadata + FROM canonical_controls + WHERE release_state = 'draft' + AND generation_metadata->>'decomposition_method' = 'pass0b' + """ + if req.limit > 0: + query += f" LIMIT {req.limit}" + + rows = db.execute(text(query)).fetchall() + controls = [] + for r in rows: + meta = r[3] if isinstance(r[3], dict) else {} + controls.append({ + "id": r[0], + "control_id": r[1], + "title": r[2], + "generation_metadata": meta, + }) + + deps, stats = generate_all_dependencies( + controls, + enable_ontology=req.enable_ontology, + enable_patterns=req.enable_patterns, + enable_domain_packs=req.enable_domain_packs, + ) + + if not req.dry_run: + stored = 0 + for dep in deps: + store_dependency(db, dep) + stored += 1 + db.commit() + stats["stored"] = stored + + return { + "dry_run": req.dry_run, + "controls_analyzed": len(controls), + "stats": stats, + "sample_dependencies": [ + { + "source": d.source_control_id[:8], + "target": d.target_control_id[:8], + "type": d.dependency_type, + "method": d.generation_method, + "priority": d.priority, + } + for d in deps[:20] + ], + } + finally: + db.close() + + +# ============================================================================= +# EVALUATE CONTROLS +# ============================================================================= + +@router.post("/evaluate") +async def evaluate(req: EvaluateRequest): + """Evaluate controls with dependency resolution.""" + db = SessionLocal() + try: + # Load control statuses + if req.control_statuses: + control_ids = list(req.control_statuses.keys()) + states = { + cid: ControlState(control_id=cid, raw_status=status) + for cid, status in req.control_statuses.items() + } + elif req.control_ids: + control_ids = req.control_ids + states = { + cid: ControlState(control_id=cid, raw_status="fail") + for cid in control_ids + } + else: + raise HTTPException(400, "Provide control_ids or control_statuses") + + # Load dependencies + deps = load_dependencies_for_controls(db, control_ids) + + # Evaluate + results = evaluate_controls(states, deps, req.company_profile) + + # Store results + store_evaluation_results(db, results, req.company_profile) + db.commit() + + # Format response + run_id = next(iter(results.values())).evaluation_run_id if results else "" + + return { + "evaluation_run_id": run_id, + "total_controls": len(results), + "dependencies_evaluated": len(deps), + "results": [ + { + "control_id": r.control_id, + "raw_status": r.raw_status, + "resolved_status": r.resolved_status, + "dependency_resolution": r.dependency_resolution, + "confidence": r.confidence, + } + for r in results.values() + ], + "summary": { + "pass": sum(1 for r in results.values() if r.resolved_status == "pass"), + "fail": sum(1 for r in results.values() if r.resolved_status == "fail"), + "not_applicable": sum(1 for r in results.values() if r.resolved_status == "not_applicable"), + "compensated_fail": sum(1 for r in results.values() if r.resolved_status == "compensated_fail"), + "review_required": sum(1 for r in results.values() if r.resolved_status == "review_required"), + }, + } + finally: + db.close() + + +# ============================================================================= +# GET EVALUATION RESULTS +# ============================================================================= + +@router.get("/evaluate/{run_id}") +async def get_evaluation_results(run_id: str): + """Get stored evaluation results for a run.""" + db = SessionLocal() + try: + rows = db.execute(text(""" + SELECT er.control_id::text, cc.control_id, cc.title, + er.raw_status, er.resolved_status, + er.dependency_resolution, er.confidence, er.reasoning + FROM control_evaluation_results er + JOIN canonical_controls cc ON cc.id = er.control_id + WHERE er.evaluation_run_id = CAST(:rid AS uuid) + ORDER BY er.resolved_status, cc.control_id + """), {"rid": run_id}).fetchall() + + if not rows: + raise HTTPException(404, "Evaluation run not found") + + return { + "evaluation_run_id": run_id, + "total": len(rows), + "results": [ + { + "control_uuid": r[0], + "control_id": r[1], + "title": r[2], + "raw_status": r[3], + "resolved_status": r[4], + "dependency_resolution": r[5], + "confidence": r[6], + "reasoning": r[7], + } + for r in rows + ], + } + finally: + db.close() + + +# ============================================================================= +# VALIDATE GRAPH (CYCLE CHECK) +# ============================================================================= + +@router.post("/validate") +async def validate_graph(): + """Validate the dependency graph for cycles.""" + db = SessionLocal() + try: + deps = load_all_active_dependencies(db) + cycles = detect_cycles(deps) + + return { + "total_dependencies": len(deps), + "cycles_found": len(cycles), + "cycles": cycles[:20], + "is_valid": len(cycles) == 0, + } + finally: + db.close() + + +# ============================================================================= +# DEPENDENCY GRAPH (FOR VISUALIZATION) +# ============================================================================= + +@router.get("/graph") +async def get_graph(limit: int = Query(default=200, le=1000)): + """Get dependency graph as nodes + edges.""" + db = SessionLocal() + try: + deps = load_all_active_dependencies(db)[:limit] + + node_ids = set() + for d in deps: + node_ids.add(d.source_control_id) + node_ids.add(d.target_control_id) + + nodes = [] + if node_ids: + id_list = list(node_ids) + rows = db.execute(text(""" + SELECT id::text, control_id, title, release_state, category + FROM canonical_controls + WHERE id = ANY(CAST(:ids AS uuid[])) + """), {"ids": id_list}).fetchall() + + for r in rows: + nodes.append({ + "id": r[0], + "control_id": r[1], + "title": r[2], + "release_state": r[3], + "category": r[4], + }) + + edges = [ + { + "source": d.source_control_id, + "target": d.target_control_id, + "type": d.dependency_type, + "priority": d.priority, + "generation_method": d.generation_method, + } + for d in deps + ] + + return { + "nodes": nodes, + "edges": edges, + "total_nodes": len(nodes), + "total_edges": len(edges), + } + finally: + db.close() diff --git a/control-pipeline/data/domain_packs/ai_act.yaml b/control-pipeline/data/domain_packs/ai_act.yaml new file mode 100644 index 0000000..e45717d --- /dev/null +++ b/control-pipeline/data/domain_packs/ai_act.yaml @@ -0,0 +1,22 @@ +domain: ai_act +version: "1.0" +description: "AI Act spezifische Abhaengigkeiten" + +rules: + - name: risk_classification_before_requirements + description: "Risikoklassifizierung muss vor High-Risk-Anforderungen stehen" + source_match: + title_contains: ["Risikoklassifizierung", "KI-System klassifiziert"] + target_match: + title_contains: ["Hochrisiko-Anforderung", "High-Risk"] + dependency_type: prerequisite + priority: 30 + + - name: fria_before_deployment + description: "Grundrechte-Folgenabschaetzung vor KI-Einsatz" + source_match: + title_contains: ["Grundrechte-Folgenabschaetzung", "FRIA"] + target_match: + title_contains: ["KI-System eingesetzt", "KI-System betrieben"] + dependency_type: prerequisite + priority: 30 diff --git a/control-pipeline/data/domain_packs/cra.yaml b/control-pipeline/data/domain_packs/cra.yaml new file mode 100644 index 0000000..fc44acc --- /dev/null +++ b/control-pipeline/data/domain_packs/cra.yaml @@ -0,0 +1,34 @@ +domain: cra +version: "1.0" +description: "Cyber Resilience Act spezifische Abhaengigkeiten" + +rules: + - name: sbom_triggers_vuln_monitoring + description: "SBOM fuehrt zu Schwachstellenmonitoring-Pflicht" + source_match: + title_contains: ["SBOM", "Komponentenverzeichnis"] + target_match: + title_contains: ["Schwachstellenmonitoring", "Vulnerability Monitoring"] + dependency_type: prerequisite + condition: + field: source.status + op: "==" + value: pass + effect: + set_status: review_required + priority: 40 + + - name: ce_partially_satisfies_evidence + description: "CE-Zertifizierung ersetzt Teile der Einzelnachweise" + source_match: + title_contains: ["CE-Konformitaet", "CE-Zertifizierung", "Konformitaetserklaerung"] + target_match: + title_contains: ["Einzelnachweis", "Konformitaetsnachweis"] + dependency_type: compensating_control + condition: + field: source.status + op: "==" + value: pass + effect: + set_status: compensated_fail + priority: 80 diff --git a/control-pipeline/data/domain_packs/gdpr.yaml b/control-pipeline/data/domain_packs/gdpr.yaml new file mode 100644 index 0000000..5937c7f --- /dev/null +++ b/control-pipeline/data/domain_packs/gdpr.yaml @@ -0,0 +1,31 @@ +domain: gdpr +version: "1.0" +description: "DSGVO-spezifische Abhaengigkeiten" + +rules: + - name: vvt_before_dsfa + description: "Verarbeitungsverzeichnis muss vor DSFA existieren" + source_match: + title_contains: ["Verarbeitungsverzeichnis", "VVT"] + target_match: + title_contains: ["Datenschutz-Folgenabschaetzung", "DSFA"] + dependency_type: prerequisite + priority: 40 + + - name: rechtsgrundlage_before_verarbeitung + description: "Rechtsgrundlage muss vor Datenverarbeitung definiert sein" + source_match: + title_contains: ["Rechtsgrundlage", "Einwilligung definiert"] + target_match: + title_contains: ["Datenverarbeitung implementiert", "personenbezogene Daten verarbeitet"] + dependency_type: prerequisite + priority: 30 + + - name: tom_before_documentation + description: "TOMs muessen implementiert sein bevor sie dokumentiert werden" + source_match: + title_contains: ["TOM implementiert", "Technische Massnahmen umgesetzt"] + target_match: + title_contains: ["TOM dokumentiert", "Massnahmen dokumentiert"] + dependency_type: prerequisite + priority: 50 diff --git a/control-pipeline/data/domain_packs/labor_contracts.yaml b/control-pipeline/data/domain_packs/labor_contracts.yaml new file mode 100644 index 0000000..46e8faa --- /dev/null +++ b/control-pipeline/data/domain_packs/labor_contracts.yaml @@ -0,0 +1,31 @@ +domain: labor_contracts +version: "1.0" +description: "Arbeitsrechtliche Abhaengigkeiten (GHV, Schulung, Nachschulung)" + +rules: + - name: ghv_supersedes_training + description: "GHV-Klausel im Vertrag macht Vertraulichkeitsschulung nicht notwendig" + source_match: + title_contains: ["GHV-Klausel", "Vertraulichkeitsklausel", "Geheimhaltungsvereinbarung", "Vertraulichkeit im Vertrag"] + target_match: + title_contains: ["Vertraulichkeitsschulung", "Vertraulichkeit geschult"] + dependency_type: supersedes + condition: + field: source.status + op: "==" + value: pass + effect: + set_status: not_applicable + priority: 10 + + - name: training_prerequisite_for_refresher + description: "Erstschulung muss vor Nachschulung existieren" + source_match: + title_contains: ["Vertraulichkeitsschulung", "Erstschulung"] + target_match: + title_contains: ["Nachschulung", "jaehrliche Schulung"] + dependency_type: prerequisite + condition: {} + effect: + set_status: review_required + priority: 50 diff --git a/control-pipeline/data/domain_packs/security.yaml b/control-pipeline/data/domain_packs/security.yaml new file mode 100644 index 0000000..35d0566 --- /dev/null +++ b/control-pipeline/data/domain_packs/security.yaml @@ -0,0 +1,34 @@ +domain: security +version: "1.0" +description: "Security-spezifische Abhaengigkeiten" + +rules: + - name: mfa_compensates_password + description: "MFA kompensiert teilweise schwache Passwortanforderungen" + source_match: + title_contains: ["MFA aktiviert", "Multi-Faktor-Authentifizierung"] + target_match: + title_contains: ["Passwortlaenge", "Passwortkomplexitaet", "Passwortrichtlinie"] + dependency_type: compensating_control + condition: + field: source.status + op: "==" + value: pass + effect: + set_status: compensated_fail + priority: 80 + + - name: cert_compensates_individual + description: "ISO 27001 Zertifizierung kompensiert einzelne Security-Controls" + source_match: + title_contains: ["ISO 27001 Zertifizierung", "ISMS Zertifizierung"] + target_match: + title_contains: ["Zugriffskontrolle", "Protokollierung", "Verschluesselung"] + dependency_type: compensating_control + condition: + field: source.status + op: "==" + value: pass + effect: + set_status: compensated_fail + priority: 80 diff --git a/control-pipeline/migrations/001_dependency_engine.sql b/control-pipeline/migrations/001_dependency_engine.sql new file mode 100644 index 0000000..80df3ef --- /dev/null +++ b/control-pipeline/migrations/001_dependency_engine.sql @@ -0,0 +1,79 @@ +-- Migration 001: Control Dependency Engine (Block 9) +-- Schema: compliance (search_path already set) +-- Run: psql -U breakpilot -d breakpilot_db -f 001_dependency_engine.sql + +SET search_path TO compliance, public; + +-- ======================================== +-- control_dependencies +-- ======================================== + +CREATE TABLE IF NOT EXISTS control_dependencies ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + + source_control_id UUID NOT NULL REFERENCES canonical_controls(id) ON DELETE CASCADE, + target_control_id UUID NOT NULL REFERENCES canonical_controls(id) ON DELETE CASCADE, + + dependency_type VARCHAR(30) NOT NULL + CHECK (dependency_type IN ( + 'prerequisite', + 'conditional_requirement', + 'supersedes', + 'compensating_control', + 'scope_exclusion' + )), + + condition JSONB DEFAULT '{}', + effect JSONB NOT NULL DEFAULT '{}', + + priority INTEGER NOT NULL DEFAULT 100, + + generation_method VARCHAR(30) NOT NULL DEFAULT 'manual' + CHECK (generation_method IN ( + 'manual', 'ontology', 'pattern', 'domain_pack', 'llm_hint' + )), + generation_metadata JSONB DEFAULT '{}', + + is_active BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW(), + + CONSTRAINT uq_dependency_edge UNIQUE (source_control_id, target_control_id, dependency_type), + CONSTRAINT no_self_dependency CHECK (source_control_id != target_control_id) +); + +CREATE INDEX IF NOT EXISTS idx_dep_target ON control_dependencies(target_control_id) WHERE is_active = TRUE; +CREATE INDEX IF NOT EXISTS idx_dep_source ON control_dependencies(source_control_id) WHERE is_active = TRUE; +CREATE INDEX IF NOT EXISTS idx_dep_type ON control_dependencies(dependency_type); + +-- ======================================== +-- control_evaluation_results +-- ======================================== + +CREATE TABLE IF NOT EXISTS control_evaluation_results ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + + control_id UUID NOT NULL REFERENCES canonical_controls(id) ON DELETE CASCADE, + evaluation_run_id UUID NOT NULL, + company_profile JSONB DEFAULT '{}', + + raw_status VARCHAR(30) NOT NULL, + resolved_status VARCHAR(30) NOT NULL + CHECK (resolved_status IN ( + 'pass', 'fail', 'not_applicable', + 'partially_satisfied', 'compensated_fail', + 'review_required' + )), + + dependency_resolution JSONB DEFAULT '[]', + confidence FLOAT DEFAULT 1.0, + reasoning TEXT DEFAULT '', + + evaluated_at TIMESTAMPTZ DEFAULT NOW(), + + CONSTRAINT uq_eval_per_run UNIQUE (control_id, evaluation_run_id) +); + +CREATE INDEX IF NOT EXISTS idx_eval_run ON control_evaluation_results(evaluation_run_id); +CREATE INDEX IF NOT EXISTS idx_eval_control ON control_evaluation_results(control_id); +CREATE INDEX IF NOT EXISTS idx_eval_status ON control_evaluation_results(resolved_status); diff --git a/control-pipeline/services/control_ontology.py b/control-pipeline/services/control_ontology.py index ef5ce1a..281c9e5 100644 --- a/control-pipeline/services/control_ontology.py +++ b/control-pipeline/services/control_ontology.py @@ -351,3 +351,33 @@ def build_canonical_key( if asset_scope: parts.append(asset_scope) return ":".join(parts) + + +# ============================================================================ +# PHASE ORDERING (for dependency engine — lifecycle sequence) +# ============================================================================ + +PHASE_ORDER: dict[str, int] = { + "scope": 1, + "definition": 2, + "governance": 2, + "design": 3, + "implementation": 4, + "configuration": 5, + "operation": 6, + "training": 6, + "monitoring": 7, + "testing": 8, + "review": 9, + "assessment": 10, + "remediation": 10, + "validation": 11, + "reporting": 12, + "evidence": 13, +} + + +def get_phase_order(action_type: str) -> int: + """Get the lifecycle phase order for an action_type (1-13).""" + phase = get_phase(action_type) + return PHASE_ORDER.get(phase, 6) # default: operation (middle) diff --git a/control-pipeline/services/decomposition_pass.py b/control-pipeline/services/decomposition_pass.py index 869b1d4..3dede6a 100644 --- a/control-pipeline/services/decomposition_pass.py +++ b/control-pipeline/services/decomposition_pass.py @@ -220,6 +220,9 @@ class AtomicControlCandidate: pass_criteria: list = field(default_factory=list) fail_criteria: list = field(default_factory=list) check_type: str = "" + # Dependency Engine Felder + dependency_hints: list = field(default_factory=list) + lifecycle_phase_order: int = 0 def to_dict(self) -> dict: return { @@ -238,6 +241,8 @@ class AtomicControlCandidate: "pass_criteria": self.pass_criteria, "fail_criteria": self.fail_criteria, "check_type": self.check_type, + "dependency_hints": self.dependency_hints, + "lifecycle_phase_order": self.lifecycle_phase_order, } @@ -2133,7 +2138,9 @@ Antworte als JSON: "severity": "critical|high|medium|low", "category": "security|privacy|governance|operations|finance|reporting", "check_type": "technical_config_check|document_clause_check|code_pattern_check|evidence_check|interview_required", - "merge_key": "action_type:normalized_object:control_phase" + "merge_key": "action_type:normalized_object:control_phase", + "dependency_hints": ["dependency_type:action_type:normalized_object (Voraussetzungen, Ersetzungen, Kompensationen)"], + "lifecycle_phase_order": "1-13 (1=scope, 2=definition, 4=implementation, 7=monitoring, 8=testing, 12=reporting)" }}""" @@ -2229,7 +2236,9 @@ Jedes Control hat dieses Format: "severity": "critical|high|medium|low", "category": "security|privacy|governance|operations|finance|reporting", "check_type": "technical_config_check|document_clause_check|code_pattern_check|evidence_check|interview_required", - "merge_key": "action_type:normalized_object:control_phase" + "merge_key": "action_type:normalized_object:control_phase", + "dependency_hints": ["dependency_type:action_type:normalized_object (Voraussetzungen, Ersetzungen, Kompensationen)"], + "lifecycle_phase_order": "1-13 (1=scope, 2=definition, 4=implementation, 7=monitoring, 8=testing, 12=reporting)" }}""" @@ -2971,6 +2980,8 @@ class DecompositionPass: pass_criteria=_ensure_list(parsed.get("pass_criteria", [])), fail_criteria=_ensure_list(parsed.get("fail_criteria", [])), check_type=parsed.get("check_type", ""), + dependency_hints=_ensure_list(parsed.get("dependency_hints", [])), + lifecycle_phase_order=int(parsed.get("lifecycle_phase_order", 0) or 0), ) # Store merge_key from LLM output in metadata llm_merge_key = parsed.get("merge_key", "") @@ -2980,6 +2991,12 @@ class DecompositionPass: atomic.parent_control_uuid = obl["parent_uuid"] atomic.obligation_candidate_id = obl["candidate_id"] + # Set lifecycle_phase_order deterministically if not set by LLM + if not atomic.lifecycle_phase_order: + from services.control_ontology import classify_action, get_phase_order + action_type = classify_action(obl.get("action", "") or atomic.title) + atomic.lifecycle_phase_order = get_phase_order(action_type) + # Cap severity for implementation-specific obligations if obl.get("is_implementation_specific") and atomic.severity in ( "critical", "high" @@ -3438,6 +3455,9 @@ class DecompositionPass: "pass_criteria": atomic.pass_criteria or [], "fail_criteria": atomic.fail_criteria or [], "check_type": atomic.check_type or "", + # Dependency Engine Felder + "dependency_hints": atomic.dependency_hints or [], + "lifecycle_phase_order": atomic.lifecycle_phase_order or 0, }), "framework_id": "14b1bdd2-abc7-4a43-adae-14471ee5c7cf", }, diff --git a/control-pipeline/services/dependency_engine.py b/control-pipeline/services/dependency_engine.py new file mode 100644 index 0000000..a61d1f0 --- /dev/null +++ b/control-pipeline/services/dependency_engine.py @@ -0,0 +1,599 @@ +""" +Control Dependency Engine — evaluates control statuses considering +inter-control dependencies. + +Pure functions (no DB coupling) for: + - Generic condition evaluation (JSONB rules -> bool) + - Effect application (modifies target status) + - Cycle detection (DFS-based) + - Topological sort (evaluation order) + - Full evaluation resolution with priority-based conflict handling + +DB interaction is in separate load/store functions at the bottom. +""" + +from __future__ import annotations + +import json +import logging +import uuid +from collections import defaultdict +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Optional + +from sqlalchemy import text + +logger = logging.getLogger(__name__) + + +# ============================================================================ +# ENUMS +# ============================================================================ + +class DependencyType(str, Enum): + PREREQUISITE = "prerequisite" + CONDITIONAL_REQUIREMENT = "conditional_requirement" + SUPERSEDES = "supersedes" + COMPENSATING_CONTROL = "compensating_control" + SCOPE_EXCLUSION = "scope_exclusion" + + +class EvaluationStatus(str, Enum): + PASS = "pass" + FAIL = "fail" + NOT_APPLICABLE = "not_applicable" + PARTIALLY_SATISFIED = "partially_satisfied" + COMPENSATED_FAIL = "compensated_fail" + REVIEW_REQUIRED = "review_required" + + +# Default priority per dependency type (lower = higher priority) +DEFAULT_PRIORITIES: dict[str, int] = { + "supersedes": 10, + "scope_exclusion": 20, + "prerequisite": 50, + "conditional_requirement": 70, + "compensating_control": 80, +} + + +# ============================================================================ +# DATA CLASSES +# ============================================================================ + +@dataclass +class Dependency: + id: str = "" + source_control_id: str = "" + target_control_id: str = "" + dependency_type: str = "prerequisite" + condition: dict = field(default_factory=dict) + effect: dict = field(default_factory=dict) + priority: int = 100 + generation_method: str = "manual" + is_active: bool = True + + +@dataclass +class ControlState: + """In-memory representation of a control's evaluation state.""" + control_id: str = "" + raw_status: str = "fail" + resolved_status: str = "" + context: dict = field(default_factory=dict) + + +@dataclass +class EvaluationResult: + control_id: str = "" + evaluation_run_id: str = "" + raw_status: str = "fail" + resolved_status: str = "fail" + dependency_resolution: list = field(default_factory=list) + confidence: float = 1.0 + reasoning: str = "" + + +# ============================================================================ +# CONDITION EVALUATOR +# ============================================================================ + +def _resolve_field(field_path: str, context: dict) -> Any: + """Resolve a dot-notation field path against a nested dict. + + Examples: + _resolve_field("source.status", {"source": {"status": "pass"}}) -> "pass" + _resolve_field("context.company_size", {"context": {"company_size": "large"}}) -> "large" + """ + parts = field_path.split(".") + current = context + for part in parts: + if isinstance(current, dict): + current = current.get(part) + else: + return None + return current + + +def _evaluate_single_clause(clause: dict, context: dict) -> bool: + """Evaluate a single {field, op, value} clause.""" + field_path = clause.get("field", "") + op = clause.get("op", "==") + expected = clause.get("value") + + actual = _resolve_field(field_path, context) + + if op == "==": + return actual == expected + elif op == "!=": + return actual != expected + elif op == "in": + if isinstance(expected, list): + return actual in expected + return False + elif op == "not_in": + if isinstance(expected, list): + return actual not in expected + return True + elif op == ">": + try: + return float(actual) > float(expected) + except (TypeError, ValueError): + return False + elif op == "<": + try: + return float(actual) < float(expected) + except (TypeError, ValueError): + return False + elif op == ">=": + try: + return float(actual) >= float(expected) + except (TypeError, ValueError): + return False + elif op == "<=": + try: + return float(actual) <= float(expected) + except (TypeError, ValueError): + return False + elif op == "contains": + if isinstance(actual, (list, set, tuple)): + return expected in actual + if isinstance(actual, str): + return str(expected) in actual + return False + + return False + + +def evaluate_condition(condition: dict, context: dict) -> bool: + """Evaluate a generic condition against a context dict. + + Supports: + - Empty condition -> True (always matches) + - Simple clause: {"field": "source.status", "op": "==", "value": "pass"} + - Compound AND: {"operator": "AND", "clauses": [...]} + - Compound OR: {"operator": "OR", "clauses": [...]} + - Negation: {"operator": "NOT", "clause": {...}} + """ + if not condition: + return True + + operator = condition.get("operator") + + if operator == "AND": + clauses = condition.get("clauses", []) + return all(evaluate_condition(c, context) for c in clauses) + + if operator == "OR": + clauses = condition.get("clauses", []) + return any(evaluate_condition(c, context) for c in clauses) + + if operator == "NOT": + clause = condition.get("clause", {}) + return not evaluate_condition(clause, context) + + # Simple clause with field/op/value + if "field" in condition: + return _evaluate_single_clause(condition, context) + + return True + + +# ============================================================================ +# EFFECT APPLIER +# ============================================================================ + +def apply_effect(effect: dict, current_status: str) -> str: + """Apply a dependency effect to produce a new status. + + Effect schema: + {"set_status": "not_applicable"} + {"set_status": "compensated_fail"} + """ + new_status = effect.get("set_status") + if new_status and new_status in {s.value for s in EvaluationStatus}: + return new_status + return current_status + + +# ============================================================================ +# CYCLE DETECTION +# ============================================================================ + +WHITE, GRAY, BLACK = 0, 1, 2 + + +def detect_cycles(dependencies: list[Dependency]) -> list[list[str]]: + """Detect cycles in the dependency graph using DFS. + + Returns list of cycles (each cycle = list of control IDs). + Empty list = no cycles. + """ + graph: dict[str, list[str]] = defaultdict(list) + all_nodes: set[str] = set() + + for dep in dependencies: + if dep.is_active: + graph[dep.source_control_id].append(dep.target_control_id) + all_nodes.add(dep.source_control_id) + all_nodes.add(dep.target_control_id) + + color: dict[str, int] = {n: WHITE for n in all_nodes} + parent: dict[str, Optional[str]] = {n: None for n in all_nodes} + cycles: list[list[str]] = [] + + def dfs(node: str) -> None: + color[node] = GRAY + for neighbor in graph.get(node, []): + if color.get(neighbor, WHITE) == GRAY: + # Found a cycle — trace back + cycle = [neighbor, node] + current = parent.get(node) + while current and current != neighbor: + cycle.append(current) + current = parent.get(current) + cycles.append(cycle) + elif color.get(neighbor, WHITE) == WHITE: + parent[neighbor] = node + dfs(neighbor) + color[node] = BLACK + + for node in all_nodes: + if color[node] == WHITE: + dfs(node) + + return cycles + + +def topological_sort(dependencies: list[Dependency]) -> list[str]: + """Return control IDs in dependency-safe evaluation order. + + Sources (prerequisites) come before targets (dependents). + Controls not involved in any dependency are omitted. + """ + graph: dict[str, list[str]] = defaultdict(list) + in_degree: dict[str, int] = defaultdict(int) + all_nodes: set[str] = set() + + for dep in dependencies: + if dep.is_active: + # source -> target means: source should be evaluated first + graph[dep.source_control_id].append(dep.target_control_id) + in_degree.setdefault(dep.source_control_id, 0) + in_degree[dep.target_control_id] = in_degree.get(dep.target_control_id, 0) + 1 + all_nodes.add(dep.source_control_id) + all_nodes.add(dep.target_control_id) + + # Kahn's algorithm + queue = [n for n in all_nodes if in_degree.get(n, 0) == 0] + result: list[str] = [] + + while queue: + queue.sort() # deterministic order + node = queue.pop(0) + result.append(node) + for neighbor in graph.get(node, []): + in_degree[neighbor] -= 1 + if in_degree[neighbor] == 0: + queue.append(neighbor) + + return result + + +# ============================================================================ +# MAIN EVALUATION ENGINE +# ============================================================================ + +def evaluate_controls( + control_states: dict[str, ControlState], + dependencies: list[Dependency], + context: dict, +) -> dict[str, EvaluationResult]: + """Evaluate all controls considering dependencies. + + Args: + control_states: control_id -> ControlState (with raw_status) + dependencies: all active dependencies + context: company profile (industry, company_size, scope_signals, etc.) + + Returns: + control_id -> EvaluationResult (with resolved_status + trace) + + Algorithm: + 1. Build adjacency (target -> dependencies) + 2. Detect cycles -> involved controls = review_required + 3. Topological sort for evaluation order + 4. For each control: evaluate conditions, apply highest-priority effect + 5. Record full dependency trace for MCP output + """ + evaluation_run_id = str(uuid.uuid4()) + + # 1. Build adjacency: target_control_id -> list of dependencies + target_deps: dict[str, list[Dependency]] = defaultdict(list) + for dep in dependencies: + if dep.is_active: + target_deps[dep.target_control_id].append(dep) + + # 2. Cycle detection + cycles = detect_cycles(dependencies) + cycle_controls: set[str] = set() + for cycle in cycles: + cycle_controls.update(cycle) + + # 3. Topological sort (excluding cycle controls) + safe_deps = [ + d for d in dependencies + if d.is_active + and d.source_control_id not in cycle_controls + and d.target_control_id not in cycle_controls + ] + eval_order = topological_sort(safe_deps) + + # Add remaining controls (those not in any dependency + cycle controls) + all_ids = set(control_states.keys()) + remaining = all_ids - set(eval_order) + eval_order.extend(sorted(remaining)) + + # 4. Iterate and evaluate + results: dict[str, EvaluationResult] = {} + + for control_id in eval_order: + state = control_states.get(control_id) + if not state: + continue + + # Cycle controls -> review_required + if control_id in cycle_controls: + results[control_id] = EvaluationResult( + control_id=control_id, + evaluation_run_id=evaluation_run_id, + raw_status=state.raw_status, + resolved_status=EvaluationStatus.REVIEW_REQUIRED.value, + dependency_resolution=[{"cycle_detected": True}], + confidence=0.5, + reasoning="Zyklische Abhaengigkeit erkannt — manuelle Pruefung erforderlich.", + ) + continue + + # Collect dependencies targeting this control + deps_for_target = target_deps.get(control_id, []) + if not deps_for_target: + results[control_id] = EvaluationResult( + control_id=control_id, + evaluation_run_id=evaluation_run_id, + raw_status=state.raw_status, + resolved_status=state.raw_status, + confidence=1.0, + ) + continue + + # Evaluate each dependency's condition + matching_effects: list[tuple[int, dict, Dependency]] = [] + trace: list[dict] = [] + + for dep in sorted(deps_for_target, key=lambda d: d.priority): + source_state = control_states.get(dep.source_control_id) + source_result = results.get(dep.source_control_id) + + source_status = "unknown" + if source_result: + source_status = source_result.resolved_status + elif source_state: + source_status = source_state.raw_status + + eval_ctx = { + "source": {"status": source_status}, + "target": {"status": state.raw_status}, + "context": context, + } + + condition_met = evaluate_condition(dep.condition, eval_ctx) + + trace_entry = { + "dependency_id": dep.id, + "dependency_type": dep.dependency_type, + "source_control_id": dep.source_control_id, + "source_status": source_status, + "condition_met": condition_met, + "effect_applied": dep.effect if condition_met else None, + "priority": dep.priority, + } + trace.append(trace_entry) + + if condition_met: + matching_effects.append((dep.priority, dep.effect, dep)) + + # Apply highest-priority (lowest number) effect + resolved = state.raw_status + if matching_effects: + matching_effects.sort(key=lambda x: x[0]) + _, best_effect, _ = matching_effects[0] + resolved = apply_effect(best_effect, state.raw_status) + + results[control_id] = EvaluationResult( + control_id=control_id, + evaluation_run_id=evaluation_run_id, + raw_status=state.raw_status, + resolved_status=resolved, + dependency_resolution=trace, + confidence=_compute_confidence(trace), + ) + + return results + + +def _compute_confidence(trace: list[dict]) -> float: + """Compute confidence based on dependency resolution trace.""" + if not trace: + return 1.0 + + met_count = sum(1 for t in trace if t.get("condition_met")) + total = len(trace) + + if met_count == 0: + return 1.0 # No dependencies fired -> raw status stands + if met_count == 1: + return 0.95 # Single dependency resolved + # Multiple dependencies -> slightly lower confidence + return max(0.7, 1.0 - (met_count - 1) * 0.1) + + +# ============================================================================ +# DB INTERACTION (separate from pure logic) +# ============================================================================ + +def load_dependencies_for_controls( + db, control_ids: list[str], +) -> list[Dependency]: + """Load all active dependencies involving the given control IDs.""" + if not control_ids: + return [] + + rows = db.execute( + text(""" + SELECT id, source_control_id, target_control_id, + dependency_type, condition, effect, priority, + generation_method, is_active + FROM control_dependencies + WHERE is_active = TRUE + AND (source_control_id = ANY(CAST(:ids AS uuid[])) + OR target_control_id = ANY(CAST(:ids AS uuid[]))) + """), + {"ids": control_ids}, + ).fetchall() + + return [ + Dependency( + id=str(r[0]), + source_control_id=str(r[1]), + target_control_id=str(r[2]), + dependency_type=r[3], + condition=r[4] if isinstance(r[4], dict) else {}, + effect=r[5] if isinstance(r[5], dict) else {}, + priority=r[6], + generation_method=r[7], + is_active=r[8], + ) + for r in rows + ] + + +def load_all_active_dependencies(db) -> list[Dependency]: + """Load all active dependencies.""" + rows = db.execute( + text(""" + SELECT id, source_control_id, target_control_id, + dependency_type, condition, effect, priority, + generation_method, is_active + FROM control_dependencies + WHERE is_active = TRUE + ORDER BY priority + """), + ).fetchall() + + return [ + Dependency( + id=str(r[0]), + source_control_id=str(r[1]), + target_control_id=str(r[2]), + dependency_type=r[3], + condition=r[4] if isinstance(r[4], dict) else {}, + effect=r[5] if isinstance(r[5], dict) else {}, + priority=r[6], + generation_method=r[7], + is_active=r[8], + ) + for r in rows + ] + + +def store_dependency(db, dep: Dependency) -> str: + """Insert a dependency, return its UUID.""" + row = db.execute( + text(""" + INSERT INTO control_dependencies + (source_control_id, target_control_id, dependency_type, + condition, effect, priority, generation_method, generation_metadata) + VALUES + (CAST(:src AS uuid), CAST(:tgt AS uuid), :dtype, + CAST(:cond AS jsonb), CAST(:eff AS jsonb), :prio, :gmethod, CAST(:gmeta AS jsonb)) + ON CONFLICT (source_control_id, target_control_id, dependency_type) + DO UPDATE SET + condition = EXCLUDED.condition, + effect = EXCLUDED.effect, + priority = EXCLUDED.priority, + updated_at = NOW() + RETURNING id::text + """), + { + "src": dep.source_control_id, + "tgt": dep.target_control_id, + "dtype": dep.dependency_type, + "cond": json.dumps(dep.condition), + "eff": json.dumps(dep.effect), + "prio": dep.priority, + "gmethod": dep.generation_method, + "gmeta": json.dumps({}), + }, + ).fetchone() + + return row[0] if row else "" + + +def store_evaluation_results( + db, results: dict[str, EvaluationResult], company_profile: dict, +) -> int: + """Batch insert evaluation results. Returns row count.""" + count = 0 + for result in results.values(): + db.execute( + text(""" + INSERT INTO control_evaluation_results + (control_id, evaluation_run_id, company_profile, + raw_status, resolved_status, dependency_resolution, + confidence, reasoning) + VALUES + (CAST(:cid AS uuid), CAST(:rid AS uuid), CAST(:prof AS jsonb), + :raw, :resolved, CAST(:trace AS jsonb), + :conf, :reason) + ON CONFLICT (control_id, evaluation_run_id) + DO UPDATE SET + resolved_status = EXCLUDED.resolved_status, + dependency_resolution = EXCLUDED.dependency_resolution, + confidence = EXCLUDED.confidence + """), + { + "cid": result.control_id, + "rid": result.evaluation_run_id, + "prof": json.dumps(company_profile), + "raw": result.raw_status, + "resolved": result.resolved_status, + "trace": json.dumps(result.dependency_resolution), + "conf": result.confidence, + "reason": result.reasoning, + }, + ) + count += 1 + + return count diff --git a/control-pipeline/services/dependency_generator.py b/control-pipeline/services/dependency_generator.py new file mode 100644 index 0000000..b25cbe9 --- /dev/null +++ b/control-pipeline/services/dependency_generator.py @@ -0,0 +1,381 @@ +""" +Dependency Generator — automatic discovery of control dependencies. + +Three strategies: + 1. Ontology-based: same normalized_object + phase sequence -> prerequisite + 2. Pattern-based: known patterns (define->implement, implement->monitor, etc.) + 3. Domain packs: YAML-defined rules for specific regulatory domains +""" + +from __future__ import annotations + +import logging +import os +import re +from collections import defaultdict +from typing import Optional + +import yaml + +from services.dependency_engine import Dependency, DEFAULT_PRIORITIES + +logger = logging.getLogger(__name__) + + +# ============================================================================ +# PHASE ORDERING (imported from ontology) +# ============================================================================ + +from services.control_ontology import PHASE_ORDER + + +# ============================================================================ +# PATTERN RULES +# ============================================================================ + +PATTERN_RULES: list[dict] = [ + { + "name": "define_before_implement", + "source_filter": {"action_type": "define"}, + "target_filter": {"action_type": "implement"}, + "match_on": "normalized_object", + "dependency_type": "prerequisite", + "condition": {}, + "effect": {"set_status": "review_required"}, + "priority": 50, + }, + { + "name": "implement_before_monitor", + "source_filter": {"action_type_in": ["implement", "configure", "enforce"]}, + "target_filter": {"action_type_in": ["monitor", "review", "test"]}, + "match_on": "normalized_object", + "dependency_type": "prerequisite", + "condition": {}, + "effect": {"set_status": "review_required"}, + "priority": 50, + }, + { + "name": "define_before_enforce", + "source_filter": {"action_type": "define"}, + "target_filter": {"action_type": "enforce"}, + "match_on": "normalized_object", + "dependency_type": "prerequisite", + "condition": {}, + "effect": {"set_status": "review_required"}, + "priority": 50, + }, + { + "name": "implement_before_validate", + "source_filter": {"action_type_in": ["implement", "configure"]}, + "target_filter": {"action_type_in": ["validate", "verify"]}, + "match_on": "normalized_object", + "dependency_type": "prerequisite", + "condition": {}, + "effect": {"set_status": "review_required"}, + "priority": 50, + }, + { + "name": "train_before_review", + "source_filter": {"action_type": "train"}, + "target_filter": {"action_type_in": ["review", "assess"]}, + "match_on": "normalized_object", + "dependency_type": "prerequisite", + "condition": {}, + "effect": {"set_status": "review_required"}, + "priority": 60, + }, +] + + +# ============================================================================ +# HELPER: Parse merge_key into components +# ============================================================================ + +def _parse_merge_key(merge_key: str) -> dict: + """Parse 'action_type:normalized_object:phase[:asset_scope]' into components.""" + parts = merge_key.split(":") + result = { + "action_type": parts[0] if len(parts) > 0 else "", + "normalized_object": parts[1] if len(parts) > 1 else "", + "phase": parts[2] if len(parts) > 2 else "", + "asset_scope": parts[3] if len(parts) > 3 else "", + } + return result + + +def _get_control_merge_key(control: dict) -> str: + """Extract merge_key from a control dict (from generation_metadata or top-level).""" + mk = control.get("merge_key", "") + if not mk: + meta = control.get("generation_metadata", {}) + if isinstance(meta, str): + try: + import json + meta = json.loads(meta) + except (ValueError, TypeError): + meta = {} + mk = meta.get("merge_group_hint", "") + return mk + + +# ============================================================================ +# ONTOLOGY-BASED GENERATOR +# ============================================================================ + +def generate_ontology_dependencies(controls: list[dict]) -> list[Dependency]: + """Generate prerequisite dependencies from lifecycle phase ordering. + + Rule: If two controls share the same normalized_object and control A's + phase precedes control B's phase, then A is a prerequisite for B. + + Groups by normalized_object first (O(n) grouping, O(k^2) per group + where k is typically 2-8). + """ + # Group controls by normalized_object + groups: dict[str, list[dict]] = defaultdict(list) + + for ctrl in controls: + mk = _get_control_merge_key(ctrl) + if not mk: + continue + parsed = _parse_merge_key(mk) + obj = parsed["normalized_object"] + if obj: + ctrl["_parsed_mk"] = parsed + ctrl["_phase_order"] = PHASE_ORDER.get(parsed["phase"], 6) + groups[obj].append(ctrl) + + dependencies: list[Dependency] = [] + + for obj, group in groups.items(): + if len(group) < 2: + continue + + # Sort by phase order + group.sort(key=lambda c: c["_phase_order"]) + + # Create prerequisite edges between adjacent phases + for i in range(len(group)): + for j in range(i + 1, len(group)): + a = group[i] + b = group[j] + if a["_phase_order"] < b["_phase_order"]: + dep = Dependency( + source_control_id=a.get("id", a.get("control_id", "")), + target_control_id=b.get("id", b.get("control_id", "")), + dependency_type="prerequisite", + condition={}, + effect={"set_status": "review_required"}, + priority=DEFAULT_PRIORITIES["prerequisite"], + generation_method="ontology", + ) + dependencies.append(dep) + + return dependencies + + +# ============================================================================ +# PATTERN-BASED GENERATOR +# ============================================================================ + +def _matches_filter(control: dict, filter_: dict) -> bool: + """Check if a control matches a pattern filter.""" + parsed = control.get("_parsed_mk", {}) + action = parsed.get("action_type", "") + + if "action_type" in filter_: + if action != filter_["action_type"]: + return False + + if "action_type_in" in filter_: + if action not in filter_["action_type_in"]: + return False + + return True + + +def generate_pattern_dependencies( + controls: list[dict], + rules: Optional[list[dict]] = None, +) -> list[Dependency]: + """Apply pattern rules to generate dependencies between controls.""" + if rules is None: + rules = PATTERN_RULES + + # Pre-parse merge keys + for ctrl in controls: + if "_parsed_mk" not in ctrl: + mk = _get_control_merge_key(ctrl) + if mk: + ctrl["_parsed_mk"] = _parse_merge_key(mk) + else: + ctrl["_parsed_mk"] = {} + + dependencies: list[Dependency] = [] + + for rule in rules: + sources = [c for c in controls if _matches_filter(c, rule["source_filter"])] + targets = [c for c in controls if _matches_filter(c, rule["target_filter"])] + + match_on = rule.get("match_on") + + for src in sources: + for tgt in targets: + src_id = src.get("id", src.get("control_id", "")) + tgt_id = tgt.get("id", tgt.get("control_id", "")) + + if src_id == tgt_id: + continue + + if match_on == "normalized_object": + src_obj = src.get("_parsed_mk", {}).get("normalized_object", "") + tgt_obj = tgt.get("_parsed_mk", {}).get("normalized_object", "") + if not src_obj or src_obj != tgt_obj: + continue + + dep = Dependency( + source_control_id=src_id, + target_control_id=tgt_id, + dependency_type=rule["dependency_type"], + condition=rule.get("condition", {}), + effect=rule.get("effect", {"set_status": "review_required"}), + priority=rule.get("priority", 100), + generation_method="pattern", + ) + dependencies.append(dep) + + return dependencies + + +# ============================================================================ +# DOMAIN PACK GENERATOR +# ============================================================================ + +def load_domain_pack(path: str) -> dict: + """Load a YAML domain pack.""" + with open(path, "r", encoding="utf-8") as f: + return yaml.safe_load(f) or {} + + +def _title_matches(title: str, patterns: list[str]) -> bool: + """Check if a title contains any of the given patterns (case-insensitive).""" + title_lower = title.lower() + return any(p.lower() in title_lower for p in patterns) + + +def generate_domain_dependencies( + controls: list[dict], + domain_pack_dir: str = "", +) -> list[Dependency]: + """Apply all domain packs to generate domain-specific dependencies.""" + if not domain_pack_dir: + domain_pack_dir = os.path.join( + os.path.dirname(os.path.dirname(__file__)), "data", "domain_packs" + ) + + if not os.path.isdir(domain_pack_dir): + return [] + + dependencies: list[Dependency] = [] + + for filename in sorted(os.listdir(domain_pack_dir)): + if not filename.endswith((".yaml", ".yml")): + continue + + pack = load_domain_pack(os.path.join(domain_pack_dir, filename)) + rules = pack.get("rules", []) + + for rule in rules: + src_match = rule.get("source_match", {}) + tgt_match = rule.get("target_match", {}) + + src_title_patterns = src_match.get("title_contains", []) + tgt_title_patterns = tgt_match.get("title_contains", []) + + sources = [ + c for c in controls + if src_title_patterns and _title_matches(c.get("title", ""), src_title_patterns) + ] + targets = [ + c for c in controls + if tgt_title_patterns and _title_matches(c.get("title", ""), tgt_title_patterns) + ] + + for src in sources: + for tgt in targets: + src_id = src.get("id", src.get("control_id", "")) + tgt_id = tgt.get("id", tgt.get("control_id", "")) + if src_id == tgt_id: + continue + + dep = Dependency( + source_control_id=src_id, + target_control_id=tgt_id, + dependency_type=rule.get("dependency_type", "prerequisite"), + condition=rule.get("condition", { + "field": "source.status", "op": "==", "value": "pass", + }), + effect=rule.get("effect", {"set_status": "not_applicable"}), + priority=rule.get("priority", DEFAULT_PRIORITIES.get( + rule.get("dependency_type", "prerequisite"), 100 + )), + generation_method="domain_pack", + ) + dependencies.append(dep) + + return dependencies + + +# ============================================================================ +# TOP-LEVEL GENERATOR +# ============================================================================ + +def generate_all_dependencies( + controls: list[dict], + enable_ontology: bool = True, + enable_patterns: bool = True, + enable_domain_packs: bool = True, + domain_pack_dir: str = "", +) -> tuple[list[Dependency], dict]: + """Run all generators and return deduplicated dependencies + stats.""" + stats = { + "ontology_generated": 0, + "pattern_generated": 0, + "domain_generated": 0, + "total_before_dedup": 0, + "total_unique": 0, + "duplicates_removed": 0, + } + + all_deps: list[Dependency] = [] + + if enable_ontology: + onto_deps = generate_ontology_dependencies(controls) + stats["ontology_generated"] = len(onto_deps) + all_deps.extend(onto_deps) + + if enable_patterns: + pat_deps = generate_pattern_dependencies(controls) + stats["pattern_generated"] = len(pat_deps) + all_deps.extend(pat_deps) + + if enable_domain_packs: + dom_deps = generate_domain_dependencies(controls, domain_pack_dir) + stats["domain_generated"] = len(dom_deps) + all_deps.extend(dom_deps) + + stats["total_before_dedup"] = len(all_deps) + + # Deduplicate by (source, target, type) + seen: set[tuple[str, str, str]] = set() + unique: list[Dependency] = [] + for dep in all_deps: + key = (dep.source_control_id, dep.target_control_id, dep.dependency_type) + if key not in seen: + seen.add(key) + unique.append(dep) + + stats["total_unique"] = len(unique) + stats["duplicates_removed"] = stats["total_before_dedup"] - stats["total_unique"] + + return unique, stats diff --git a/control-pipeline/tests/test_dependency_engine.py b/control-pipeline/tests/test_dependency_engine.py new file mode 100644 index 0000000..f48b8db --- /dev/null +++ b/control-pipeline/tests/test_dependency_engine.py @@ -0,0 +1,506 @@ +""" +Tests for the Control Dependency Engine. + +Pure Python tests — no DB required. Tests condition evaluation, +effect application, cycle detection, topological sort, and +full evaluation resolution. +""" + +import sys +import os +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from services.dependency_engine import ( + Dependency, + ControlState, + EvaluationResult, + EvaluationStatus, + evaluate_condition, + apply_effect, + detect_cycles, + topological_sort, + evaluate_controls, + _resolve_field, +) + + +# ============================================================================ +# Condition Evaluator +# ============================================================================ + +class TestConditionEvaluator: + + def test_empty_condition_returns_true(self): + assert evaluate_condition({}, {}) is True + + def test_simple_equals(self): + cond = {"field": "source.status", "op": "==", "value": "pass"} + ctx = {"source": {"status": "pass"}} + assert evaluate_condition(cond, ctx) is True + + def test_simple_equals_false(self): + cond = {"field": "source.status", "op": "==", "value": "pass"} + ctx = {"source": {"status": "fail"}} + assert evaluate_condition(cond, ctx) is False + + def test_not_equals(self): + cond = {"field": "source.status", "op": "!=", "value": "pass"} + ctx = {"source": {"status": "fail"}} + assert evaluate_condition(cond, ctx) is True + + def test_in_operator(self): + cond = {"field": "context.company_size", "op": "in", "value": ["large", "enterprise"]} + ctx = {"context": {"company_size": "large"}} + assert evaluate_condition(cond, ctx) is True + + def test_in_operator_false(self): + cond = {"field": "context.company_size", "op": "in", "value": ["large", "enterprise"]} + ctx = {"context": {"company_size": "small"}} + assert evaluate_condition(cond, ctx) is False + + def test_not_in_operator(self): + cond = {"field": "context.industry", "op": "not_in", "value": ["finance", "healthcare"]} + ctx = {"context": {"industry": "retail"}} + assert evaluate_condition(cond, ctx) is True + + def test_and_compound(self): + cond = { + "operator": "AND", + "clauses": [ + {"field": "source.status", "op": "==", "value": "pass"}, + {"field": "context.company_size", "op": "in", "value": ["large"]}, + ], + } + ctx = {"source": {"status": "pass"}, "context": {"company_size": "large"}} + assert evaluate_condition(cond, ctx) is True + + def test_and_compound_one_false(self): + cond = { + "operator": "AND", + "clauses": [ + {"field": "source.status", "op": "==", "value": "pass"}, + {"field": "context.company_size", "op": "in", "value": ["large"]}, + ], + } + ctx = {"source": {"status": "pass"}, "context": {"company_size": "small"}} + assert evaluate_condition(cond, ctx) is False + + def test_or_compound(self): + cond = { + "operator": "OR", + "clauses": [ + {"field": "source.status", "op": "==", "value": "pass"}, + {"field": "source.status", "op": "==", "value": "compensated_fail"}, + ], + } + ctx = {"source": {"status": "compensated_fail"}} + assert evaluate_condition(cond, ctx) is True + + def test_not_operator(self): + cond = { + "operator": "NOT", + "clause": {"field": "source.status", "op": "==", "value": "pass"}, + } + ctx = {"source": {"status": "fail"}} + assert evaluate_condition(cond, ctx) is True + + def test_nested_field_resolution(self): + ctx = {"context": {"scope_signals": {"uses_ai": True}}} + val = _resolve_field("context.scope_signals.uses_ai", ctx) + assert val is True + + def test_greater_than(self): + cond = {"field": "context.employee_count", "op": ">", "value": 250} + ctx = {"context": {"employee_count": 500}} + assert evaluate_condition(cond, ctx) is True + + def test_contains(self): + cond = {"field": "context.scope_signals", "op": "contains", "value": "uses_ai"} + ctx = {"context": {"scope_signals": ["uses_ai", "has_employees"]}} + assert evaluate_condition(cond, ctx) is True + + +# ============================================================================ +# Effect Applier +# ============================================================================ + +class TestEffectApplier: + + def test_set_status_not_applicable(self): + assert apply_effect({"set_status": "not_applicable"}, "fail") == "not_applicable" + + def test_set_status_compensated_fail(self): + assert apply_effect({"set_status": "compensated_fail"}, "fail") == "compensated_fail" + + def test_set_status_pass(self): + assert apply_effect({"set_status": "pass"}, "fail") == "pass" + + def test_unknown_effect_returns_original(self): + assert apply_effect({"unknown_key": "value"}, "fail") == "fail" + + def test_invalid_status_returns_original(self): + assert apply_effect({"set_status": "invalid_status"}, "fail") == "fail" + + def test_empty_effect(self): + assert apply_effect({}, "pass") == "pass" + + +# ============================================================================ +# Cycle Detection +# ============================================================================ + +class TestCycleDetection: + + def test_no_cycles(self): + deps = [ + Dependency(id="1", source_control_id="A", target_control_id="B", dependency_type="prerequisite"), + Dependency(id="2", source_control_id="B", target_control_id="C", dependency_type="prerequisite"), + ] + cycles = detect_cycles(deps) + assert len(cycles) == 0 + + def test_simple_cycle(self): + deps = [ + Dependency(id="1", source_control_id="A", target_control_id="B", dependency_type="supersedes"), + Dependency(id="2", source_control_id="B", target_control_id="A", dependency_type="supersedes"), + ] + cycles = detect_cycles(deps) + assert len(cycles) > 0 + # Both A and B should be in the cycle + cycle_nodes = set() + for c in cycles: + cycle_nodes.update(c) + assert "A" in cycle_nodes + assert "B" in cycle_nodes + + def test_diamond_no_cycle(self): + deps = [ + Dependency(id="1", source_control_id="A", target_control_id="B"), + Dependency(id="2", source_control_id="A", target_control_id="C"), + Dependency(id="3", source_control_id="B", target_control_id="D"), + Dependency(id="4", source_control_id="C", target_control_id="D"), + ] + cycles = detect_cycles(deps) + assert len(cycles) == 0 + + def test_inactive_deps_ignored(self): + deps = [ + Dependency(id="1", source_control_id="A", target_control_id="B", is_active=True), + Dependency(id="2", source_control_id="B", target_control_id="A", is_active=False), + ] + cycles = detect_cycles(deps) + assert len(cycles) == 0 + + +# ============================================================================ +# Topological Sort +# ============================================================================ + +class TestTopologicalSort: + + def test_linear_chain(self): + deps = [ + Dependency(id="1", source_control_id="A", target_control_id="B"), + Dependency(id="2", source_control_id="B", target_control_id="C"), + ] + order = topological_sort(deps) + assert order.index("A") < order.index("B") + assert order.index("B") < order.index("C") + + def test_diamond_graph(self): + deps = [ + Dependency(id="1", source_control_id="A", target_control_id="B"), + Dependency(id="2", source_control_id="A", target_control_id="C"), + Dependency(id="3", source_control_id="B", target_control_id="D"), + Dependency(id="4", source_control_id="C", target_control_id="D"), + ] + order = topological_sort(deps) + assert order.index("A") < order.index("B") + assert order.index("A") < order.index("C") + assert order.index("B") < order.index("D") + assert order.index("C") < order.index("D") + + def test_disconnected_components(self): + deps = [ + Dependency(id="1", source_control_id="A", target_control_id="B"), + Dependency(id="2", source_control_id="X", target_control_id="Y"), + ] + order = topological_sort(deps) + assert len(order) == 4 + assert order.index("A") < order.index("B") + assert order.index("X") < order.index("Y") + + +# ============================================================================ +# Full Evaluation +# ============================================================================ + +class TestEvaluateControls: + + def test_no_dependencies_passthrough(self): + states = { + "A": ControlState(control_id="A", raw_status="pass"), + "B": ControlState(control_id="B", raw_status="fail"), + } + results = evaluate_controls(states, [], {}) + assert results["A"].resolved_status == "pass" + assert results["B"].resolved_status == "fail" + + def test_supersedes_makes_not_applicable(self): + """GHV-Klausel (A) supersedes Schulung (B).""" + states = { + "A": ControlState(control_id="A", raw_status="pass"), + "B": ControlState(control_id="B", raw_status="fail"), + } + deps = [ + Dependency( + id="d1", source_control_id="A", target_control_id="B", + dependency_type="supersedes", + condition={"field": "source.status", "op": "==", "value": "pass"}, + effect={"set_status": "not_applicable"}, + priority=10, + ), + ] + results = evaluate_controls(states, deps, {}) + assert results["A"].resolved_status == "pass" + assert results["B"].resolved_status == "not_applicable" + assert len(results["B"].dependency_resolution) == 1 + assert results["B"].dependency_resolution[0]["condition_met"] is True + + def test_supersedes_not_met_preserves_status(self): + """GHV-Klausel (A) fails -> Schulung (B) stays fail.""" + states = { + "A": ControlState(control_id="A", raw_status="fail"), + "B": ControlState(control_id="B", raw_status="fail"), + } + deps = [ + Dependency( + id="d1", source_control_id="A", target_control_id="B", + dependency_type="supersedes", + condition={"field": "source.status", "op": "==", "value": "pass"}, + effect={"set_status": "not_applicable"}, + priority=10, + ), + ] + results = evaluate_controls(states, deps, {}) + assert results["B"].resolved_status == "fail" + + def test_prerequisite_fail_blocks_target(self): + """define:policy (A) must pass before implement:policy (B).""" + states = { + "A": ControlState(control_id="A", raw_status="fail"), + "B": ControlState(control_id="B", raw_status="pass"), + } + deps = [ + Dependency( + id="d1", source_control_id="A", target_control_id="B", + dependency_type="prerequisite", + condition={"field": "source.status", "op": "!=", "value": "pass"}, + effect={"set_status": "review_required"}, + priority=50, + ), + ] + results = evaluate_controls(states, deps, {}) + assert results["B"].resolved_status == "review_required" + + def test_compensating_control(self): + """ISO cert (A) compensates individual control (B).""" + states = { + "A": ControlState(control_id="A", raw_status="pass"), + "B": ControlState(control_id="B", raw_status="fail"), + } + deps = [ + Dependency( + id="d1", source_control_id="A", target_control_id="B", + dependency_type="compensating_control", + condition={"field": "source.status", "op": "==", "value": "pass"}, + effect={"set_status": "compensated_fail"}, + priority=80, + ), + ] + results = evaluate_controls(states, deps, {}) + assert results["B"].resolved_status == "compensated_fail" + + def test_scope_exclusion(self): + """No AI usage -> AI controls not applicable.""" + states = { + "A": ControlState(control_id="A", raw_status="pass"), + "B": ControlState(control_id="B", raw_status="fail"), + } + deps = [ + Dependency( + id="d1", source_control_id="A", target_control_id="B", + dependency_type="scope_exclusion", + condition={"field": "source.status", "op": "==", "value": "pass"}, + effect={"set_status": "not_applicable"}, + priority=20, + ), + ] + results = evaluate_controls(states, deps, {}) + assert results["B"].resolved_status == "not_applicable" + + def test_conditional_requirement_with_context(self): + """Enhanced logging required only for large companies.""" + states = { + "A": ControlState(control_id="A", raw_status="pass"), + "B": ControlState(control_id="B", raw_status="fail"), + } + deps = [ + Dependency( + id="d1", source_control_id="A", target_control_id="B", + dependency_type="conditional_requirement", + condition={ + "operator": "AND", + "clauses": [ + {"field": "source.status", "op": "==", "value": "pass"}, + {"field": "context.company_size", "op": "in", "value": ["large", "enterprise"]}, + ], + }, + effect={"set_status": "pass"}, + priority=70, + ), + ] + # Large company -> condition met + results = evaluate_controls(states, deps, {"company_size": "large"}) + assert results["B"].resolved_status == "pass" + + # Small company -> condition not met, stays fail + results2 = evaluate_controls(states, deps, {"company_size": "small"}) + assert results2["B"].resolved_status == "fail" + + def test_priority_conflict_resolution(self): + """When scope_exclusion (prio 20) and compensating (prio 80) both match, + scope_exclusion wins.""" + states = { + "A": ControlState(control_id="A", raw_status="pass"), + "X": ControlState(control_id="X", raw_status="pass"), + "B": ControlState(control_id="B", raw_status="fail"), + } + deps = [ + Dependency( + id="d1", source_control_id="A", target_control_id="B", + dependency_type="scope_exclusion", + condition={"field": "source.status", "op": "==", "value": "pass"}, + effect={"set_status": "not_applicable"}, + priority=20, + ), + Dependency( + id="d2", source_control_id="X", target_control_id="B", + dependency_type="compensating_control", + condition={"field": "source.status", "op": "==", "value": "pass"}, + effect={"set_status": "compensated_fail"}, + priority=80, + ), + ] + results = evaluate_controls(states, deps, {}) + assert results["B"].resolved_status == "not_applicable" # scope_exclusion wins + + def test_cycle_controls_get_review_required(self): + """Controls in a cycle get review_required.""" + states = { + "A": ControlState(control_id="A", raw_status="pass"), + "B": ControlState(control_id="B", raw_status="pass"), + "C": ControlState(control_id="C", raw_status="fail"), + } + deps = [ + Dependency(id="d1", source_control_id="A", target_control_id="B", dependency_type="supersedes"), + Dependency(id="d2", source_control_id="B", target_control_id="A", dependency_type="supersedes"), + ] + results = evaluate_controls(states, deps, {}) + assert results["A"].resolved_status == "review_required" + assert results["B"].resolved_status == "review_required" + assert results["C"].resolved_status == "fail" # Not in cycle, unaffected + + def test_chain_evaluation_order(self): + """A -> B -> C: C's evaluation uses B's resolved status.""" + states = { + "A": ControlState(control_id="A", raw_status="pass"), + "B": ControlState(control_id="B", raw_status="fail"), + "C": ControlState(control_id="C", raw_status="fail"), + } + deps = [ + Dependency( + id="d1", source_control_id="A", target_control_id="B", + dependency_type="supersedes", + condition={"field": "source.status", "op": "==", "value": "pass"}, + effect={"set_status": "not_applicable"}, + priority=10, + ), + Dependency( + id="d2", source_control_id="B", target_control_id="C", + dependency_type="prerequisite", + condition={"field": "source.status", "op": "==", "value": "not_applicable"}, + effect={"set_status": "not_applicable"}, + priority=50, + ), + ] + results = evaluate_controls(states, deps, {}) + assert results["A"].resolved_status == "pass" + assert results["B"].resolved_status == "not_applicable" + # C depends on B's RESOLVED status (not_applicable), not raw (fail) + assert results["C"].resolved_status == "not_applicable" + + def test_ghv_full_scenario(self): + """Complete GHV scenario from the user's example: + MC-001: GHV-Klausel im Vertrag + MC-002: Vertraulichkeitsschulung + MC-003: Jaehrliche Nachschulung + + Case 1: Vertrag hat GHV -> Schulung + Nachschulung not_applicable + """ + states = { + "MC-001": ControlState(control_id="MC-001", raw_status="pass"), + "MC-002": ControlState(control_id="MC-002", raw_status="fail"), + "MC-003": ControlState(control_id="MC-003", raw_status="fail"), + } + deps = [ + Dependency( + id="d1", source_control_id="MC-001", target_control_id="MC-002", + dependency_type="supersedes", + condition={"field": "source.status", "op": "==", "value": "pass"}, + effect={"set_status": "not_applicable"}, + priority=10, + ), + Dependency( + id="d2", source_control_id="MC-002", target_control_id="MC-003", + dependency_type="prerequisite", + condition={"field": "source.status", "op": "!=", "value": "pass"}, + effect={"set_status": "not_applicable"}, + priority=50, + ), + ] + results = evaluate_controls(states, deps, {}) + assert results["MC-001"].resolved_status == "pass" + assert results["MC-002"].resolved_status == "not_applicable" + # MC-003: MC-002 resolved to not_applicable (!=pass) -> not_applicable + assert results["MC-003"].resolved_status == "not_applicable" + + def test_ghv_no_contract(self): + """GHV Case 2: Vertrag fehlt, Schulung vorhanden.""" + states = { + "MC-001": ControlState(control_id="MC-001", raw_status="fail"), + "MC-002": ControlState(control_id="MC-002", raw_status="pass"), + "MC-003": ControlState(control_id="MC-003", raw_status="pass"), + } + deps = [ + Dependency( + id="d1", source_control_id="MC-001", target_control_id="MC-002", + dependency_type="supersedes", + condition={"field": "source.status", "op": "==", "value": "pass"}, + effect={"set_status": "not_applicable"}, + priority=10, + ), + Dependency( + id="d2", source_control_id="MC-002", target_control_id="MC-003", + dependency_type="prerequisite", + condition={"field": "source.status", "op": "!=", "value": "pass"}, + effect={"set_status": "not_applicable"}, + priority=50, + ), + ] + results = evaluate_controls(states, deps, {}) + assert results["MC-001"].resolved_status == "fail" + assert results["MC-002"].resolved_status == "pass" # Supersedes condition not met + assert results["MC-003"].resolved_status == "pass" # Prereq met (MC-002 == pass) diff --git a/control-pipeline/tests/test_dependency_generator.py b/control-pipeline/tests/test_dependency_generator.py new file mode 100644 index 0000000..7d02aa3 --- /dev/null +++ b/control-pipeline/tests/test_dependency_generator.py @@ -0,0 +1,202 @@ +""" +Tests for automatic dependency generation. + +Tests ontology-based, pattern-based, and domain pack rules. +""" + +import os +import sys +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from services.dependency_generator import ( + generate_ontology_dependencies, + generate_pattern_dependencies, + generate_domain_dependencies, + generate_all_dependencies, + _parse_merge_key, + _title_matches, + PATTERN_RULES, +) + + +# ============================================================================ +# Helpers +# ============================================================================ + +def _ctrl(id_: str, merge_key: str, title: str = "") -> dict: + return { + "id": id_, + "control_id": id_, + "title": title or id_, + "generation_metadata": {"merge_group_hint": merge_key}, + } + + +class TestParseMergeKey: + + def test_full_key(self): + r = _parse_merge_key("implement:api_rate_limiting:implementation:api_endpoints") + assert r["action_type"] == "implement" + assert r["normalized_object"] == "api_rate_limiting" + assert r["phase"] == "implementation" + assert r["asset_scope"] == "api_endpoints" + + def test_short_key(self): + r = _parse_merge_key("define:access_policy:definition") + assert r["action_type"] == "define" + assert r["normalized_object"] == "access_policy" + assert r["phase"] == "definition" + assert r["asset_scope"] == "" + + +# ============================================================================ +# Ontology Dependencies +# ============================================================================ + +class TestOntologyDependencies: + + def test_same_object_phase_sequence(self): + """define:policy (phase 2) -> implement:policy (phase 4) generates prerequisite.""" + controls = [ + _ctrl("A", "define:access_policy:definition"), + _ctrl("B", "implement:access_policy:implementation"), + ] + deps = generate_ontology_dependencies(controls) + assert len(deps) == 1 + assert deps[0].source_control_id == "A" + assert deps[0].target_control_id == "B" + assert deps[0].dependency_type == "prerequisite" + + def test_different_objects_no_dependency(self): + controls = [ + _ctrl("A", "define:access_policy:definition"), + _ctrl("B", "implement:rate_limiting:implementation"), + ] + deps = generate_ontology_dependencies(controls) + assert len(deps) == 0 + + def test_same_phase_no_dependency(self): + """Controls in the same phase should not generate prerequisite.""" + controls = [ + _ctrl("A", "implement:access_policy:implementation"), + _ctrl("B", "enforce:access_policy:implementation"), + ] + deps = generate_ontology_dependencies(controls) + assert len(deps) == 0 + + def test_three_phase_chain(self): + """define -> implement -> test generates 3 dependencies (A->B, A->C, B->C).""" + controls = [ + _ctrl("A", "define:access_policy:definition"), + _ctrl("B", "implement:access_policy:implementation"), + _ctrl("C", "test:access_policy:testing"), + ] + deps = generate_ontology_dependencies(controls) + assert len(deps) == 3 + # A -> B, A -> C, B -> C + pairs = {(d.source_control_id, d.target_control_id) for d in deps} + assert ("A", "B") in pairs + assert ("A", "C") in pairs + assert ("B", "C") in pairs + + +# ============================================================================ +# Pattern Dependencies +# ============================================================================ + +class TestPatternDependencies: + + def test_define_before_implement(self): + controls = [ + _ctrl("A", "define:firewall_policy:definition"), + _ctrl("B", "implement:firewall_policy:implementation"), + ] + deps = generate_pattern_dependencies(controls) + matching = [d for d in deps if d.source_control_id == "A" and d.target_control_id == "B"] + assert len(matching) >= 1 + assert matching[0].dependency_type == "prerequisite" + + def test_implement_before_monitor(self): + controls = [ + _ctrl("A", "implement:logging:implementation"), + _ctrl("B", "monitor:logging:monitoring"), + ] + deps = generate_pattern_dependencies(controls) + matching = [d for d in deps if d.source_control_id == "A" and d.target_control_id == "B"] + assert len(matching) >= 1 + + def test_no_match_different_objects(self): + controls = [ + _ctrl("A", "define:access_policy:definition"), + _ctrl("B", "implement:encryption:implementation"), + ] + deps = generate_pattern_dependencies(controls) + assert len(deps) == 0 + + +# ============================================================================ +# Domain Packs +# ============================================================================ + +class TestDomainPacks: + + def test_ghv_supersedes_training(self): + pack_dir = os.path.join(os.path.dirname(__file__), "..", "data", "domain_packs") + controls = [ + {"id": "MC-001", "title": "GHV-Klausel im Arbeitsvertrag enthalten"}, + {"id": "MC-002", "title": "Mitarbeiter zur Vertraulichkeit geschult"}, + ] + deps = generate_domain_dependencies(controls, pack_dir) + matching = [d for d in deps if d.source_control_id == "MC-001" and d.target_control_id == "MC-002"] + assert len(matching) == 1 + assert matching[0].dependency_type == "supersedes" + + def test_vvt_before_dsfa(self): + pack_dir = os.path.join(os.path.dirname(__file__), "..", "data", "domain_packs") + controls = [ + {"id": "GDPR-001", "title": "Verarbeitungsverzeichnis erstellt"}, + {"id": "GDPR-002", "title": "Datenschutz-Folgenabschaetzung durchgefuehrt"}, + ] + deps = generate_domain_dependencies(controls, pack_dir) + matching = [d for d in deps if d.source_control_id == "GDPR-001"] + assert len(matching) == 1 + assert matching[0].dependency_type == "prerequisite" + + def test_title_matches_helper(self): + assert _title_matches("GHV-Klausel vorhanden", ["GHV-Klausel"]) + assert not _title_matches("MFA aktiviert", ["GHV-Klausel"]) + + +# ============================================================================ +# Combined Generator +# ============================================================================ + +class TestGenerateAll: + + def test_deduplication(self): + """Same dependency from ontology and pattern should be deduplicated.""" + controls = [ + _ctrl("A", "define:access_policy:definition"), + _ctrl("B", "implement:access_policy:implementation"), + ] + deps, stats = generate_all_dependencies( + controls, enable_ontology=True, enable_patterns=True, enable_domain_packs=False, + ) + # Both generators find the same pair, dedup removes one + assert stats["duplicates_removed"] >= 0 + assert stats["total_unique"] <= stats["total_before_dedup"] + # At least one A->B prerequisite + matching = [d for d in deps if d.source_control_id == "A" and d.target_control_id == "B"] + assert len(matching) == 1 + + def test_stats_populated(self): + controls = [ + _ctrl("A", "define:policy:definition"), + _ctrl("B", "implement:policy:implementation"), + ] + _, stats = generate_all_dependencies(controls, enable_domain_packs=False) + assert "ontology_generated" in stats + assert "pattern_generated" in stats + assert "total_unique" in stats