diff --git a/control-pipeline/services/applicability_engine.py b/control-pipeline/services/applicability_engine.py index a5b2d5e..c5f4334 100644 --- a/control-pipeline/services/applicability_engine.py +++ b/control-pipeline/services/applicability_engine.py @@ -1,7 +1,9 @@ """ Applicability Engine -- filters controls based on company profile + scope answers. -Deterministic, no LLM needed. Implements Scoped Control Applicability (Phase C2). +Two layers: + 1. Deterministic Filter (Phase C2) — fast SQL + Python filtering + 2. Assessment Layer — confidence scoring, escalation detection, reasoning Filtering logic: - Controls with NULL applicability fields are INCLUDED (apply to everyone). @@ -18,6 +20,7 @@ from __future__ import annotations import json import logging +from dataclasses import dataclass, field, asdict from typing import Any, Optional from sqlalchemy import text @@ -29,6 +32,40 @@ logger = logging.getLogger(__name__) # Valid company sizes (ordered smallest to largest) VALID_SIZES = ("micro", "small", "medium", "large", "enterprise") +# Signals that indicate potentially regulated financial activity +_FINANCIAL_SIGNALS = {"operates_payment_service", "holds_client_funds", "performs_kyc", + "monitors_transactions", "marketplace_model"} + +# Signals that are ambiguous and may require legal review +_AMBIGUOUS_SIGNALS = {"provides_embedded_connectivity", "marketplace_model"} + +# Contradictory signal pairs (if both present → escalate) +_CONTRADICTORY_PAIRS = [ + ("holds_client_funds", "operates_payment_service"), # holds funds but claims not a payment service +] + +# Repo signals that suggest regulated activity +_REPO_SIGNAL_REGULATORY_MAP = { + "wallet_service": "financial", + "custody": "financial", + "kyc_provider": "financial", + "transaction_monitoring": "financial", + "payment_processing": "financial", + "stripe": "vendor_payment", # NOT own payment service + "paypal": "vendor_payment", +} + + +@dataclass +class AssessmentResult: + """Assessment layer result — confidence, escalation, reasoning.""" + confidence: float = 1.0 + escalation_flag: bool = False + escalation_reason: Optional[str] = None + inferred_signals: list = field(default_factory=list) + reasoning: str = "" + warnings: list = field(default_factory=list) + def _parse_json_text(value: Any) -> Any: """Parse a TEXT column that stores JSON. Returns None if unparseable.""" @@ -206,6 +243,15 @@ def get_applicable_controls( industry_counts.get("unclassified", 0) + 1 ) + # Assessment layer + assessment = _assess( + industry=industry, + company_size=company_size, + scope_signals=scope_signals, + total_applicable=total_applicable, + applicable_controls=applicable, + ) + return { "total_applicable": total_applicable, "limit": limit, @@ -216,6 +262,7 @@ def get_applicable_controls( "by_severity": severity_counts, "by_industry": industry_counts, }, + "assessment": asdict(assessment), } @@ -243,3 +290,163 @@ def _row_to_control(r) -> dict[str, Any]: "created_at": r.created_at.isoformat() if r.created_at else None, "updated_at": r.updated_at.isoformat() if r.updated_at else None, } + + +# ============================================================================= +# Assessment Layer — Confidence, Escalation, Reasoning +# ============================================================================= + + +def _assess( + industry: Optional[str], + company_size: Optional[str], + scope_signals: Optional[list[str]], + total_applicable: int, + applicable_controls: list, +) -> AssessmentResult: + """Compute assessment result from filter inputs and outputs. + + Deterministic scoring — no LLM needed. + """ + signals = scope_signals or [] + result = AssessmentResult(inferred_signals=list(signals)) + warnings = [] + + # --- Confidence scoring --- + score = 0.0 + + # Industry specified? (+0.25) + if industry: + score += 0.25 + else: + warnings.append("Keine Branche angegeben — alle Controls werden angezeigt") + + # Company size specified? (+0.15) + if company_size: + score += 0.15 + else: + warnings.append("Keine Unternehmensgroesse angegeben") + + # Scope signals provided? (+0.20 if any, +0.30 if >=3) + if len(signals) >= 3: + score += 0.30 + elif len(signals) >= 1: + score += 0.20 + else: + warnings.append("Keine Scope-Signale angegeben — Filterung nur nach Branche/Groesse") + + # Controls found? (+0.15 if >5, +0.05 if 1-5) + if total_applicable > 5: + score += 0.15 + elif total_applicable > 0: + score += 0.05 + + # No contradictions? (+0.15) + contradictions = _detect_contradictions(signals) + if not contradictions: + score += 0.15 + else: + for c in contradictions: + warnings.append(f"Widerspruch: {c}") + + result.confidence = round(min(score, 1.0), 2) + + # --- Escalation detection --- + escalation_reasons = [] + + # Rule 1: Contradictory signals + if contradictions: + escalation_reasons.append( + f"Widersprüchliche Angaben: {'; '.join(contradictions)}" + ) + + # Rule 2: Ambiguous signals present + active_ambiguous = set(signals) & _AMBIGUOUS_SIGNALS + if active_ambiguous: + escalation_reasons.append( + f"Mehrdeutige Signale erfordern vertiefte Prüfung: {', '.join(sorted(active_ambiguous))}" + ) + + # Rule 3: Financial signals without explicit payment service declaration + active_financial = set(signals) & _FINANCIAL_SIGNALS + if active_financial and "operates_payment_service" not in signals: + if any(s in signals for s in ("holds_client_funds", "performs_kyc", "monitors_transactions")): + escalation_reasons.append( + "Finanznahe Signale ohne explizite Angabe zu Zahlungsdienst-Status — " + "regulatorische Einordnung (PSD2/ZAG) vertieft prüfen" + ) + + # Rule 4: Very few inputs → low confidence + if not industry and not company_size and not signals: + escalation_reasons.append( + "Unvollständiges Profil — keine Branche, Größe oder Scope-Signale angegeben" + ) + + if escalation_reasons: + result.escalation_flag = True + result.escalation_reason = " | ".join(escalation_reasons) + # Cap confidence for escalation cases + result.confidence = min(result.confidence, 0.75) + + # --- Reasoning --- + reasoning_parts = [] + + if industry: + reasoning_parts.append(f"Branche: {industry}") + if company_size: + reasoning_parts.append(f"Unternehmensgröße: {company_size}") + if signals: + reasoning_parts.append(f"Aktive Scope-Signale: {', '.join(sorted(signals))}") + + reasoning_parts.append(f"{total_applicable} Controls zugewiesen") + + if total_applicable > 0: + # Collect unique source regulations from controls + sources = set() + for r in applicable_controls[:500]: + sc = _parse_json_text(getattr(r, "scope_conditions", None)) + if isinstance(sc, dict) and sc.get("requires_any"): + for sig in sc["requires_any"]: + if sig in signals: + desc = sc.get("description", "") + if desc: + sources.add(desc) + if sources: + reasoning_parts.append( + f"Scope-bedingte Controls: {'; '.join(sorted(sources)[:5])}" + ) + + if warnings: + reasoning_parts.append(f"Hinweise: {'; '.join(warnings)}") + + if result.escalation_flag: + reasoning_parts.append(f"ESKALATION: {result.escalation_reason}") + + result.reasoning = ". ".join(reasoning_parts) + "." + result.warnings = warnings + + return result + + +def _detect_contradictions(signals: list[str]) -> list[str]: + """Detect contradictory signal pairs.""" + contradictions = [] + signal_set = set(signals) + + # holds_client_funds but NOT operates_payment_service + if "holds_client_funds" in signal_set and "operates_payment_service" not in signal_set: + contradictions.append( + "holds_client_funds=true aber operates_payment_service nicht gesetzt — " + "unklar ob regulierter Zahlungsdienst" + ) + + # performs_kyc but NOT operates_payment_service and NOT marketplace_model + if ("performs_kyc" in signal_set + and "operates_payment_service" not in signal_set + and "marketplace_model" not in signal_set): + contradictions.append( + "performs_kyc=true ohne Payment- oder Marktplatz-Kontext — " + "regulatorische Grundlage für KYC unklar" + ) + + return contradictions