Files
breakpilot-core/control-pipeline/services/control_ontology.py
T
Benjamin Admin 652e3a65a3
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-consent (push) Successful in 36s
CI / test-python-voice (push) Successful in 33s
CI / test-bqas (push) Successful in 31s
feat(pipeline): F2+F3 action/object ontology — DB-backed normalization
Migrates ACTION_TYPES (26+8 types), _NEGATIVE_PATTERNS (22), _ACTION_SYNONYMS
(65), and _OBJECT_SYNONYMS (75) from hardcoded dicts to DB tables.

- SQL migration: 003_action_object_ontology.sql (3 tables)
- Migration scripts: f2_migrate_actions.py (34 types, 145 synonyms), f3_migrate_objects.py (75 objects)
- OntologyRegistry cache: 5min TTL, raises RuntimeError if empty (safe fallback to dicts)
- control_ontology.classify_action/get_phase delegate to DB with dict fallback
- control_dedup.normalize_action/normalize_object delegate to DB with dict fallback
- 25 new tests, 446 total pass, 0 regressions

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-03 23:47:53 +02:00

396 lines
13 KiB
Python

"""
Control Ontology — Controlled vocabulary for action types, object classes,
normalized objects, and pre-LLM classification.
Used by:
1. Pre-LLM filter (classify obligations before sending to API)
2. Canonical key generation (deterministic dedup key)
3. Post-LLM validation (reject invalid action_type/object_class)
4. BatchDedup (merge_group from normalized_object)
"""
from __future__ import annotations
import re
from typing import Optional
# ============================================================================
# ACTION TYPES (26) — with German aliases
# ============================================================================
ACTION_TYPES: dict[str, dict] = {
"define": {
"aliases": ["definieren", "festlegen", "bestimmen", "vorgeben"],
"phase": "definition",
},
"document": {
"aliases": ["dokumentieren", "aufzeichnen", "protokollieren", "schriftlich festhalten"],
"phase": "evidence",
},
"approve": {
"aliases": ["freigeben", "genehmigen", "bestätigen"],
"phase": "governance",
},
"implement": {
"aliases": ["implementieren", "umsetzen", "einführen", "einsetzen", "bereitstellen",
"etablieren", "einrichten", "aufbauen"],
"phase": "implementation",
},
"configure": {
"aliases": ["konfigurieren", "einstellen", "parametrisieren"],
"phase": "configuration",
},
"enforce": {
"aliases": ["durchsetzen", "erzwingen", "technisch erzwingen"],
"phase": "implementation",
},
"maintain": {
"aliases": ["pflegen", "aktuell halten", "aufrechterhalten", "führen"],
"phase": "operation",
},
"monitor": {
"aliases": ["überwachen", "beobachten", "monitoren"],
"phase": "monitoring",
},
"review": {
"aliases": ["überprüfen", "prüfen", "reviewen", "kontrollieren"],
"phase": "review",
},
"assess": {
"aliases": ["bewerten", "beurteilen", "einschätzen", "analysieren"],
"phase": "assessment",
},
"identify": {
"aliases": ["identifizieren", "erkennen", "erfassen", "feststellen"],
"phase": "assessment",
},
"remediate": {
"aliases": ["beheben", "adressieren", "mitigieren", "behandeln", "abstellen"],
"phase": "remediation",
},
"test": {
"aliases": ["testen", "ausprobieren", "Test durchführen"],
"phase": "testing",
},
"verify": {
"aliases": ["verifizieren", "nachweisen", "bestätigen"],
"phase": "testing",
},
"validate": {
"aliases": ["validieren", "Wirksamkeit prüfen"],
"phase": "validation",
},
"report": {
"aliases": ["melden", "berichten", "Anzeige erstatten"],
"phase": "reporting",
},
"notify": {
"aliases": ["benachrichtigen", "informieren", "unterrichten"],
"phase": "reporting",
},
"train": {
"aliases": ["schulen", "unterweisen", "sensibilisieren"],
"phase": "training",
},
"retain": {
"aliases": ["aufbewahren", "archivieren", "speichern"],
"phase": "evidence",
},
"delete": {
"aliases": ["löschen", "vernichten", "entfernen"],
"phase": "operation",
},
"prevent": {
"aliases": ["verhindern", "vermeiden", "unterbinden"],
"phase": "implementation",
},
"exclude": {
"aliases": ["nicht zulassen", "ausschließen", "verbieten", "untersagen"],
"phase": "implementation",
},
"restrict_access": {
"aliases": ["Zugriff beschränken", "autorisieren", "berechtigen", "beschränken"],
"phase": "implementation",
},
"encrypt": {
"aliases": ["verschlüsseln", "kryptografisch schützen"],
"phase": "implementation",
},
"invalidate": {
"aliases": ["invalidieren", "ungültig machen", "widerrufen"],
"phase": "operation",
},
"issue": {
"aliases": ["ausstellen", "vergeben", "erzeugen", "generieren"],
"phase": "operation",
},
}
# Build reverse lookup: German alias → action_type
_ALIAS_TO_ACTION: dict[str, str] = {}
for action_type, info in ACTION_TYPES.items():
for alias in info["aliases"]:
_ALIAS_TO_ACTION[alias.lower()] = action_type
# ============================================================================
# NEGATIVE OBLIGATION PATTERNS
# ============================================================================
_NEGATIVE_PATTERNS: list[tuple[str, str]] = [
# Longer/specific patterns first (checked in order)
("nicht wiederverwendet", "prevent"),
("nicht in der url", "prevent"),
("nicht im token", "prevent"),
("nicht in logs", "prevent"),
("nicht in urls", "prevent"),
("nicht gespeichert", "prevent"),
("nicht übertragen", "prevent"),
("nicht erscheinen", "prevent"),
("verhindern", "prevent"),
("unterbinden", "prevent"),
("abweisen", "enforce"),
("blockieren", "enforce"),
("zurückweisen", "enforce"),
# Generic negative patterns last
("dürfen nicht", "exclude"),
("dürfen keine", "exclude"),
("darf nicht", "exclude"),
("darf keine", "exclude"),
("nicht zulässig", "exclude"),
("nicht erlaubt", "exclude"),
("verboten", "exclude"),
("untersagt", "exclude"),
]
# ============================================================================
# CONTAINER / COMPOSITE OBJECTS (must NOT become atomic)
# ============================================================================
CONTAINER_OBJECTS: set[str] = {
"sichere sitzungsverwaltung",
"token-schutz",
"sorgfaltspflichten für drittkomponenten",
"risikomanagementsystem",
"secure development lifecycle",
"informationssicherheitsmanagement",
"datenschutzmanagement",
"ki-governance",
"sicherheitsmaßnahmen",
"technische und organisatorische maßnahmen",
"compliance-programm",
"umfassendes risikomanagement",
"sicherer softwareentwicklungsprozess",
}
# ============================================================================
# EVIDENCE INDICATORS (must NOT become a control)
# ============================================================================
EVIDENCE_INDICATORS: set[str] = {
"nachweis", "dokumentation", "screenshot", "export", "auditbericht",
"prüfbericht", "zertifizierung", "log-auszug", "jira-ticket",
"servicenow-ticket", "sbom-nachweis", "freigabevermerk",
"review-protokoll", "testprotokoll",
}
# ============================================================================
# FRAMEWORK REFERENCES (must NOT become atomic directly)
# ============================================================================
_FRAMEWORK_PATTERNS: list[str] = [
r"OWASP\s+ASVS\s+V\d",
r"OWASP\s+API\d+",
r"OWASP\s+API\s+Top\s+10",
r"NIST\s+SP\s+800-\d+",
r"NIST\s+IA[\s-]",
r"NIST\s+AC[\s-]",
r"BSI\s+IT-Grundschutz",
r"BSI\s+200-\d",
r"(?:CSA\s+)?CCM[\s-]",
r"ISO\s+27001",
r"ISO\s+27002",
r"alle\s+Controls\s+der\s+Kategorie",
]
# ============================================================================
# CLASSIFICATION FUNCTIONS
# ============================================================================
def classify_action(text: str) -> str:
"""Classify an obligation action text into a canonical action_type.
Delegates to DB-backed OntologyRegistry (with 5min cache).
Falls back to hardcoded dicts if DB is unavailable.
"""
try:
from .ontology_registry import get_ontology_registry
return get_ontology_registry().classify_action(text)
except Exception:
pass
# Fallback: original logic
text_lower = text.lower().strip()
for pattern, action_type in _NEGATIVE_PATTERNS:
if pattern in text_lower:
return action_type
if text_lower in _ALIAS_TO_ACTION:
return _ALIAS_TO_ACTION[text_lower]
best_match = ""
best_action = "implement"
for alias, action_type in sorted(_ALIAS_TO_ACTION.items(), key=lambda x: -len(x[0])):
if alias in text_lower and len(alias) > len(best_match):
best_match = alias
best_action = action_type
return best_action
def get_phase(action_type: str) -> str:
"""Get the control_phase for an action_type.
Delegates to DB-backed OntologyRegistry with dict fallback.
"""
try:
from .ontology_registry import get_ontology_registry
return get_ontology_registry().get_phase(action_type)
except Exception:
pass
info = ACTION_TYPES.get(action_type, {})
return info.get("phase", "implementation")
def is_container(text: str) -> bool:
"""Check if obligation text describes a container/composite — not atomic."""
text_lower = text.lower().strip()
return any(container in text_lower for container in CONTAINER_OBJECTS)
def is_evidence(text: str) -> bool:
"""Check if obligation text is actually evidence, not a control."""
text_lower = text.lower().strip()
# Primary check: evidence indicators at the start
for indicator in EVIDENCE_INDICATORS:
if text_lower.startswith(indicator):
return True
# German articles: ein/eine/einen/einem/einer + indicator
for article in ("ein ", "eine ", "einen ", "einem ", "einer "):
if f"{article}{indicator}" in text_lower:
return True
# Secondary: "X dokumentieren" where X is another action's result
if text_lower.endswith("dokumentieren") or text_lower.endswith("dokumentiert"):
# Check if the primary subject is an action result, not a standalone duty
action_words = {"tests", "maßnahmen", "ergebnisse", "prüfungen", "änderungen"}
if any(w in text_lower for w in action_words):
return True
return False
def is_framework_reference(text: str) -> bool:
"""Check if obligation references a framework that should be decomposed, not atomic."""
for pattern in _FRAMEWORK_PATTERNS:
if re.search(pattern, text, re.IGNORECASE):
# Only if the text is a generic "implement X framework" statement
# Use stems to handle German conjugation (umsetzen/umzusetzen/umgesetzt)
implement_stems = ("umsetz", "umzusetz", "implementier", "einhalt", "erfüll", "anwend")
text_lower = text.lower()
if any(s in text_lower for s in implement_stems):
return True
return False
def classify_obligation(text: str, action: str = "") -> dict:
"""Classify an obligation for pre-LLM routing.
Returns:
{
"routing": "atomic" | "composite" | "evidence" | "framework_container",
"action_type": str,
"phase": str,
"reason": str,
}
"""
if is_evidence(text):
return {
"routing": "evidence",
"action_type": "document",
"phase": "evidence",
"reason": f"Evidence indicator detected",
}
if is_container(text):
return {
"routing": "composite",
"action_type": classify_action(action or text),
"phase": get_phase(classify_action(action or text)),
"reason": "Container/composite object detected",
}
if is_framework_reference(text):
return {
"routing": "framework_container",
"action_type": classify_action(action or text),
"phase": get_phase(classify_action(action or text)),
"reason": "Framework reference detected",
}
action_type = classify_action(action or text)
return {
"routing": "atomic",
"action_type": action_type,
"phase": get_phase(action_type),
"reason": "Atomic obligation",
}
def build_canonical_key(
action_type: str,
normalized_object: str,
phase: Optional[str] = None,
asset_scope: Optional[str] = None,
) -> str:
"""Build a canonical dedup key."""
parts = [action_type, normalized_object]
if phase:
parts.append(phase)
if asset_scope:
parts.append(asset_scope)
return ":".join(parts)
# ============================================================================
# PHASE ORDERING (for dependency engine — lifecycle sequence)
# ============================================================================
PHASE_ORDER: dict[str, int] = {
"scope": 1,
"definition": 2,
"governance": 2,
"design": 3,
"implementation": 4,
"configuration": 5,
"operation": 6,
"training": 6,
"monitoring": 7,
"testing": 8,
"review": 9,
"assessment": 10,
"remediation": 10,
"validation": 11,
"reporting": 12,
"evidence": 13,
}
def get_phase_order(action_type: str) -> int:
"""Get the lifecycle phase order for an action_type (1-13)."""
phase = get_phase(action_type)
return PHASE_ORDER.get(phase, 6) # default: operation (middle)