feat: Applicability Engine + API-Filter + DB-Sync + Cleanup
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-consent (push) Successful in 35s
CI / test-python-voice (push) Successful in 33s
CI / test-bqas (push) Successful in 37s
CI / Deploy (push) Failing after 2s
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-consent (push) Successful in 35s
CI / test-python-voice (push) Successful in 33s
CI / test-bqas (push) Successful in 37s
CI / Deploy (push) Failing after 2s
- Applicability Engine (deterministisch, kein LLM): filtert Controls nach Branche, Unternehmensgroesse, Scope-Signalen - API-Filter auf GET /controls, /controls-count, /controls-meta - POST /controls/applicable Endpoint fuer Company-Profile-Matching - 35 Unit-Tests fuer Engine - Port-8098-Konflikt mit Nginx gefixt (nur expose, kein Host-Port) - CLAUDE.md: control-pipeline Dokumentation ergaenzt - 6 internationale Gesetze geloescht (ES/FR/HU/NL/SE/CZ — nur DACH) - DB-Backup-Import-Script (import_backup.py) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
245
control-pipeline/services/applicability_engine.py
Normal file
245
control-pipeline/services/applicability_engine.py
Normal file
@@ -0,0 +1,245 @@
|
||||
"""
|
||||
Applicability Engine -- filters controls based on company profile + scope answers.
|
||||
|
||||
Deterministic, no LLM needed. Implements Scoped Control Applicability (Phase C2).
|
||||
|
||||
Filtering logic:
|
||||
- Controls with NULL applicability fields are INCLUDED (apply to everyone).
|
||||
- Controls with '["all"]' match all queries.
|
||||
- Industry: control applies if its applicable_industries contains the requested
|
||||
industry OR contains "all" OR is NULL.
|
||||
- Company size: control applies if its applicable_company_size contains the
|
||||
requested size OR contains "all" OR is NULL.
|
||||
- Scope signals: control applies if it has NO scope_conditions, or the company
|
||||
has at least one of the required signals (requires_any logic).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Any, Optional
|
||||
|
||||
from sqlalchemy import text
|
||||
|
||||
from db.session import SessionLocal
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Valid company sizes (ordered smallest to largest)
|
||||
VALID_SIZES = ("micro", "small", "medium", "large", "enterprise")
|
||||
|
||||
|
||||
def _parse_json_text(value: Any) -> Any:
|
||||
"""Parse a TEXT column that stores JSON. Returns None if unparseable."""
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, (list, dict)):
|
||||
return value
|
||||
if isinstance(value, str):
|
||||
try:
|
||||
return json.loads(value)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def _matches_industry(applicable_industries_raw: Any, industry: str) -> bool:
|
||||
"""Check if a control's applicable_industries matches the requested industry."""
|
||||
industries = _parse_json_text(applicable_industries_raw)
|
||||
if industries is None:
|
||||
return True # NULL = applies to everyone
|
||||
if not isinstance(industries, list):
|
||||
return True # malformed = include
|
||||
if "all" in industries:
|
||||
return True
|
||||
return industry in industries
|
||||
|
||||
|
||||
def _matches_company_size(applicable_company_size_raw: Any, company_size: str) -> bool:
|
||||
"""Check if a control's applicable_company_size matches the requested size."""
|
||||
sizes = _parse_json_text(applicable_company_size_raw)
|
||||
if sizes is None:
|
||||
return True # NULL = applies to everyone
|
||||
if not isinstance(sizes, list):
|
||||
return True # malformed = include
|
||||
if "all" in sizes:
|
||||
return True
|
||||
return company_size in sizes
|
||||
|
||||
|
||||
def _matches_scope_signals(
|
||||
scope_conditions_raw: Any, scope_signals: list[str]
|
||||
) -> bool:
|
||||
"""Check if a control's scope_conditions are satisfied by the given signals.
|
||||
|
||||
A control with scope_conditions = {"requires_any": ["uses_ai", "processes_health_data"]}
|
||||
matches if the company has at least one of those signals.
|
||||
A control with NULL or empty scope_conditions always matches.
|
||||
"""
|
||||
conditions = _parse_json_text(scope_conditions_raw)
|
||||
if conditions is None:
|
||||
return True # no conditions = applies to everyone
|
||||
if not isinstance(conditions, dict):
|
||||
return True # malformed = include
|
||||
|
||||
requires_any = conditions.get("requires_any", [])
|
||||
if not requires_any:
|
||||
return True # no required signals = applies to everyone
|
||||
|
||||
# Company must have at least one of the required signals
|
||||
return bool(set(requires_any) & set(scope_signals))
|
||||
|
||||
|
||||
def get_applicable_controls(
|
||||
db,
|
||||
industry: Optional[str] = None,
|
||||
company_size: Optional[str] = None,
|
||||
scope_signals: Optional[list[str]] = None,
|
||||
limit: int = 100,
|
||||
offset: int = 0,
|
||||
) -> dict[str, Any]:
|
||||
"""
|
||||
Returns controls applicable to the given company profile.
|
||||
|
||||
Uses SQL pre-filtering with LIKE for performance, then Python post-filtering
|
||||
for precise JSON matching (since columns are TEXT, not JSONB).
|
||||
|
||||
Args:
|
||||
db: SQLAlchemy session
|
||||
industry: e.g. "Telekommunikation", "Energie", "Gesundheitswesen"
|
||||
company_size: e.g. "medium", "large", "enterprise"
|
||||
scope_signals: e.g. ["uses_ai", "third_country_transfer"]
|
||||
limit: max results to return (applied after filtering)
|
||||
offset: pagination offset (applied after filtering)
|
||||
|
||||
Returns:
|
||||
dict with total_applicable count, paginated controls, and breakdown stats
|
||||
"""
|
||||
if scope_signals is None:
|
||||
scope_signals = []
|
||||
|
||||
# SQL pre-filter: broad match to reduce Python-side filtering
|
||||
query = """
|
||||
SELECT id, framework_id, control_id, title, objective, rationale,
|
||||
scope, requirements, test_procedure, evidence,
|
||||
severity, risk_score, implementation_effort,
|
||||
evidence_confidence, open_anchors, release_state, tags,
|
||||
license_rule, source_original_text, source_citation,
|
||||
customer_visible, verification_method, category, evidence_type,
|
||||
target_audience, generation_metadata, generation_strategy,
|
||||
applicable_industries, applicable_company_size, scope_conditions,
|
||||
parent_control_uuid, decomposition_method, pipeline_version,
|
||||
created_at, updated_at
|
||||
FROM canonical_controls
|
||||
WHERE release_state NOT IN ('duplicate', 'deprecated', 'rejected')
|
||||
"""
|
||||
params: dict[str, Any] = {}
|
||||
|
||||
# SQL-level pre-filtering (broad, may include false positives)
|
||||
if industry:
|
||||
query += """ AND (applicable_industries IS NULL
|
||||
OR applicable_industries LIKE '%"all"%'
|
||||
OR applicable_industries LIKE '%' || :industry || '%')"""
|
||||
params["industry"] = industry
|
||||
|
||||
if company_size:
|
||||
query += """ AND (applicable_company_size IS NULL
|
||||
OR applicable_company_size LIKE '%"all"%'
|
||||
OR applicable_company_size LIKE '%' || :company_size || '%')"""
|
||||
params["company_size"] = company_size
|
||||
|
||||
# For scope_signals we cannot do precise SQL filtering on requires_any,
|
||||
# but we can at least exclude controls whose scope_conditions text
|
||||
# does not contain any of the requested signals (if only 1 signal).
|
||||
# With multiple signals we skip SQL pre-filter and do it in Python.
|
||||
if scope_signals and len(scope_signals) == 1:
|
||||
query += """ AND (scope_conditions IS NULL
|
||||
OR scope_conditions LIKE '%' || :scope_sig || '%')"""
|
||||
params["scope_sig"] = scope_signals[0]
|
||||
|
||||
query += " ORDER BY control_id"
|
||||
|
||||
rows = db.execute(text(query), params).fetchall()
|
||||
|
||||
# Python-level precise filtering
|
||||
applicable = []
|
||||
for r in rows:
|
||||
if industry and not _matches_industry(r.applicable_industries, industry):
|
||||
continue
|
||||
if company_size and not _matches_company_size(
|
||||
r.applicable_company_size, company_size
|
||||
):
|
||||
continue
|
||||
if scope_signals and not _matches_scope_signals(
|
||||
r.scope_conditions, scope_signals
|
||||
):
|
||||
continue
|
||||
applicable.append(r)
|
||||
|
||||
total_applicable = len(applicable)
|
||||
|
||||
# Apply pagination
|
||||
paginated = applicable[offset : offset + limit]
|
||||
|
||||
# Build domain breakdown
|
||||
domain_counts: dict[str, int] = {}
|
||||
for r in applicable:
|
||||
domain = r.control_id.split("-")[0].upper() if r.control_id else "UNKNOWN"
|
||||
domain_counts[domain] = domain_counts.get(domain, 0) + 1
|
||||
|
||||
# Build severity breakdown
|
||||
severity_counts: dict[str, int] = {}
|
||||
for r in applicable:
|
||||
sev = r.severity or "unknown"
|
||||
severity_counts[sev] = severity_counts.get(sev, 0) + 1
|
||||
|
||||
# Build industry breakdown (from matched controls)
|
||||
industry_counts: dict[str, int] = {}
|
||||
for r in applicable:
|
||||
industries = _parse_json_text(r.applicable_industries)
|
||||
if isinstance(industries, list):
|
||||
for ind in industries:
|
||||
industry_counts[ind] = industry_counts.get(ind, 0) + 1
|
||||
else:
|
||||
industry_counts["unclassified"] = (
|
||||
industry_counts.get("unclassified", 0) + 1
|
||||
)
|
||||
|
||||
return {
|
||||
"total_applicable": total_applicable,
|
||||
"limit": limit,
|
||||
"offset": offset,
|
||||
"controls": [_row_to_control(r) for r in paginated],
|
||||
"breakdown": {
|
||||
"by_domain": domain_counts,
|
||||
"by_severity": severity_counts,
|
||||
"by_industry": industry_counts,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _row_to_control(r) -> dict[str, Any]:
|
||||
"""Convert a DB row to a control dict for API response."""
|
||||
return {
|
||||
"id": str(r.id),
|
||||
"framework_id": str(r.framework_id),
|
||||
"control_id": r.control_id,
|
||||
"title": r.title,
|
||||
"objective": r.objective,
|
||||
"rationale": r.rationale,
|
||||
"severity": r.severity,
|
||||
"category": r.category,
|
||||
"verification_method": r.verification_method,
|
||||
"evidence_type": getattr(r, "evidence_type", None),
|
||||
"target_audience": r.target_audience,
|
||||
"applicable_industries": r.applicable_industries,
|
||||
"applicable_company_size": r.applicable_company_size,
|
||||
"scope_conditions": r.scope_conditions,
|
||||
"release_state": r.release_state,
|
||||
"control_id_domain": (
|
||||
r.control_id.split("-")[0].upper() if r.control_id else None
|
||||
),
|
||||
"created_at": r.created_at.isoformat() if r.created_at else None,
|
||||
"updated_at": r.updated_at.isoformat() if r.updated_at else None,
|
||||
}
|
||||
Reference in New Issue
Block a user