feat(control-pipeline): add Assessment Layer to Applicability Engine

Adds confidence scoring, escalation detection, and reasoning to the
deterministic filter. All assessment is deterministic (no LLM).

Confidence scoring (0.0-1.0):
- +0.25 industry specified
- +0.15 company size specified
- +0.20-0.30 scope signals provided
- +0.15 controls found
- +0.15 no contradictions
- Capped at 0.75 for escalation cases

Escalation triggers:
- Contradictory signals (holds_client_funds without operates_payment_service)
- Ambiguous signals (provides_embedded_connectivity)
- Financial signals without explicit payment service declaration
- Incomplete profile (no industry, size, or signals)

Reasoning: template-based, includes active signals, control count,
scope-condition descriptions, and warnings.

Response now includes "assessment" field with confidence, escalation_flag,
escalation_reason, inferred_signals, reasoning, and warnings.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-04-23 20:36:11 +02:00
parent 716bc651c4
commit 38684dd903

View File

@@ -1,7 +1,9 @@
""" """
Applicability Engine -- filters controls based on company profile + scope answers. Applicability Engine -- filters controls based on company profile + scope answers.
Deterministic, no LLM needed. Implements Scoped Control Applicability (Phase C2). Two layers:
1. Deterministic Filter (Phase C2) — fast SQL + Python filtering
2. Assessment Layer — confidence scoring, escalation detection, reasoning
Filtering logic: Filtering logic:
- Controls with NULL applicability fields are INCLUDED (apply to everyone). - Controls with NULL applicability fields are INCLUDED (apply to everyone).
@@ -18,6 +20,7 @@ from __future__ import annotations
import json import json
import logging import logging
from dataclasses import dataclass, field, asdict
from typing import Any, Optional from typing import Any, Optional
from sqlalchemy import text from sqlalchemy import text
@@ -29,6 +32,40 @@ logger = logging.getLogger(__name__)
# Valid company sizes (ordered smallest to largest) # Valid company sizes (ordered smallest to largest)
VALID_SIZES = ("micro", "small", "medium", "large", "enterprise") VALID_SIZES = ("micro", "small", "medium", "large", "enterprise")
# Signals that indicate potentially regulated financial activity
_FINANCIAL_SIGNALS = {"operates_payment_service", "holds_client_funds", "performs_kyc",
"monitors_transactions", "marketplace_model"}
# Signals that are ambiguous and may require legal review
_AMBIGUOUS_SIGNALS = {"provides_embedded_connectivity", "marketplace_model"}
# Contradictory signal pairs (if both present → escalate)
_CONTRADICTORY_PAIRS = [
("holds_client_funds", "operates_payment_service"), # holds funds but claims not a payment service
]
# Repo signals that suggest regulated activity
_REPO_SIGNAL_REGULATORY_MAP = {
"wallet_service": "financial",
"custody": "financial",
"kyc_provider": "financial",
"transaction_monitoring": "financial",
"payment_processing": "financial",
"stripe": "vendor_payment", # NOT own payment service
"paypal": "vendor_payment",
}
@dataclass
class AssessmentResult:
"""Assessment layer result — confidence, escalation, reasoning."""
confidence: float = 1.0
escalation_flag: bool = False
escalation_reason: Optional[str] = None
inferred_signals: list = field(default_factory=list)
reasoning: str = ""
warnings: list = field(default_factory=list)
def _parse_json_text(value: Any) -> Any: def _parse_json_text(value: Any) -> Any:
"""Parse a TEXT column that stores JSON. Returns None if unparseable.""" """Parse a TEXT column that stores JSON. Returns None if unparseable."""
@@ -206,6 +243,15 @@ def get_applicable_controls(
industry_counts.get("unclassified", 0) + 1 industry_counts.get("unclassified", 0) + 1
) )
# Assessment layer
assessment = _assess(
industry=industry,
company_size=company_size,
scope_signals=scope_signals,
total_applicable=total_applicable,
applicable_controls=applicable,
)
return { return {
"total_applicable": total_applicable, "total_applicable": total_applicable,
"limit": limit, "limit": limit,
@@ -216,6 +262,7 @@ def get_applicable_controls(
"by_severity": severity_counts, "by_severity": severity_counts,
"by_industry": industry_counts, "by_industry": industry_counts,
}, },
"assessment": asdict(assessment),
} }
@@ -243,3 +290,163 @@ def _row_to_control(r) -> dict[str, Any]:
"created_at": r.created_at.isoformat() if r.created_at else None, "created_at": r.created_at.isoformat() if r.created_at else None,
"updated_at": r.updated_at.isoformat() if r.updated_at else None, "updated_at": r.updated_at.isoformat() if r.updated_at else None,
} }
# =============================================================================
# Assessment Layer — Confidence, Escalation, Reasoning
# =============================================================================
def _assess(
industry: Optional[str],
company_size: Optional[str],
scope_signals: Optional[list[str]],
total_applicable: int,
applicable_controls: list,
) -> AssessmentResult:
"""Compute assessment result from filter inputs and outputs.
Deterministic scoring — no LLM needed.
"""
signals = scope_signals or []
result = AssessmentResult(inferred_signals=list(signals))
warnings = []
# --- Confidence scoring ---
score = 0.0
# Industry specified? (+0.25)
if industry:
score += 0.25
else:
warnings.append("Keine Branche angegeben — alle Controls werden angezeigt")
# Company size specified? (+0.15)
if company_size:
score += 0.15
else:
warnings.append("Keine Unternehmensgroesse angegeben")
# Scope signals provided? (+0.20 if any, +0.30 if >=3)
if len(signals) >= 3:
score += 0.30
elif len(signals) >= 1:
score += 0.20
else:
warnings.append("Keine Scope-Signale angegeben — Filterung nur nach Branche/Groesse")
# Controls found? (+0.15 if >5, +0.05 if 1-5)
if total_applicable > 5:
score += 0.15
elif total_applicable > 0:
score += 0.05
# No contradictions? (+0.15)
contradictions = _detect_contradictions(signals)
if not contradictions:
score += 0.15
else:
for c in contradictions:
warnings.append(f"Widerspruch: {c}")
result.confidence = round(min(score, 1.0), 2)
# --- Escalation detection ---
escalation_reasons = []
# Rule 1: Contradictory signals
if contradictions:
escalation_reasons.append(
f"Widersprüchliche Angaben: {'; '.join(contradictions)}"
)
# Rule 2: Ambiguous signals present
active_ambiguous = set(signals) & _AMBIGUOUS_SIGNALS
if active_ambiguous:
escalation_reasons.append(
f"Mehrdeutige Signale erfordern vertiefte Prüfung: {', '.join(sorted(active_ambiguous))}"
)
# Rule 3: Financial signals without explicit payment service declaration
active_financial = set(signals) & _FINANCIAL_SIGNALS
if active_financial and "operates_payment_service" not in signals:
if any(s in signals for s in ("holds_client_funds", "performs_kyc", "monitors_transactions")):
escalation_reasons.append(
"Finanznahe Signale ohne explizite Angabe zu Zahlungsdienst-Status — "
"regulatorische Einordnung (PSD2/ZAG) vertieft prüfen"
)
# Rule 4: Very few inputs → low confidence
if not industry and not company_size and not signals:
escalation_reasons.append(
"Unvollständiges Profil — keine Branche, Größe oder Scope-Signale angegeben"
)
if escalation_reasons:
result.escalation_flag = True
result.escalation_reason = " | ".join(escalation_reasons)
# Cap confidence for escalation cases
result.confidence = min(result.confidence, 0.75)
# --- Reasoning ---
reasoning_parts = []
if industry:
reasoning_parts.append(f"Branche: {industry}")
if company_size:
reasoning_parts.append(f"Unternehmensgröße: {company_size}")
if signals:
reasoning_parts.append(f"Aktive Scope-Signale: {', '.join(sorted(signals))}")
reasoning_parts.append(f"{total_applicable} Controls zugewiesen")
if total_applicable > 0:
# Collect unique source regulations from controls
sources = set()
for r in applicable_controls[:500]:
sc = _parse_json_text(getattr(r, "scope_conditions", None))
if isinstance(sc, dict) and sc.get("requires_any"):
for sig in sc["requires_any"]:
if sig in signals:
desc = sc.get("description", "")
if desc:
sources.add(desc)
if sources:
reasoning_parts.append(
f"Scope-bedingte Controls: {'; '.join(sorted(sources)[:5])}"
)
if warnings:
reasoning_parts.append(f"Hinweise: {'; '.join(warnings)}")
if result.escalation_flag:
reasoning_parts.append(f"ESKALATION: {result.escalation_reason}")
result.reasoning = ". ".join(reasoning_parts) + "."
result.warnings = warnings
return result
def _detect_contradictions(signals: list[str]) -> list[str]:
"""Detect contradictory signal pairs."""
contradictions = []
signal_set = set(signals)
# holds_client_funds but NOT operates_payment_service
if "holds_client_funds" in signal_set and "operates_payment_service" not in signal_set:
contradictions.append(
"holds_client_funds=true aber operates_payment_service nicht gesetzt — "
"unklar ob regulierter Zahlungsdienst"
)
# performs_kyc but NOT operates_payment_service and NOT marketplace_model
if ("performs_kyc" in signal_set
and "operates_payment_service" not in signal_set
and "marketplace_model" not in signal_set):
contradictions.append(
"performs_kyc=true ohne Payment- oder Marktplatz-Kontext — "
"regulatorische Grundlage für KYC unklar"
)
return contradictions