""" Applicability Engine -- filters controls based on company profile + scope answers. Two layers: 1. Deterministic Filter (Phase C2) — fast SQL + Python filtering 2. Assessment Layer — confidence scoring, escalation detection, reasoning Filtering logic: - Controls with NULL applicability fields are INCLUDED (apply to everyone). - Controls with '["all"]' match all queries. - Industry: control applies if its applicable_industries contains the requested industry OR contains "all" OR is NULL. - Company size: control applies if its applicable_company_size contains the requested size OR contains "all" OR is NULL. - Scope signals: control applies if it has NO scope_conditions, or the company has at least one of the required signals (requires_any logic). """ from __future__ import annotations import json import logging from dataclasses import dataclass, field, asdict from typing import Any, Optional from sqlalchemy import text from db.session import SessionLocal logger = logging.getLogger(__name__) # Valid company sizes (ordered smallest to largest) VALID_SIZES = ("micro", "small", "medium", "large", "enterprise") # Signals that indicate potentially regulated financial activity _FINANCIAL_SIGNALS = {"operates_payment_service", "holds_client_funds", "performs_kyc", "monitors_transactions", "marketplace_model"} # Signals that are ambiguous and may require legal review _AMBIGUOUS_SIGNALS = {"provides_embedded_connectivity", "marketplace_model"} # Contradictory signal pairs (if both present → escalate) _CONTRADICTORY_PAIRS = [ ("holds_client_funds", "operates_payment_service"), # holds funds but claims not a payment service ] # Repo signals that suggest regulated activity _REPO_SIGNAL_REGULATORY_MAP = { "wallet_service": "financial", "custody": "financial", "kyc_provider": "financial", "transaction_monitoring": "financial", "payment_processing": "financial", "stripe": "vendor_payment", # NOT own payment service "paypal": "vendor_payment", } @dataclass class AssessmentResult: """Assessment layer result — confidence, escalation, reasoning.""" confidence: float = 1.0 escalation_flag: bool = False escalation_reason: Optional[str] = None inferred_signals: list = field(default_factory=list) reasoning: str = "" warnings: list = field(default_factory=list) def _parse_json_text(value: Any) -> Any: """Parse a TEXT column that stores JSON. Returns None if unparseable.""" if value is None: return None if isinstance(value, (list, dict)): return value if isinstance(value, str): try: return json.loads(value) except (json.JSONDecodeError, ValueError): return None return None def _matches_industry(applicable_industries_raw: Any, industry: str) -> bool: """Check if a control's applicable_industries matches the requested industry.""" industries = _parse_json_text(applicable_industries_raw) if industries is None: return True # NULL = applies to everyone if not isinstance(industries, list): return True # malformed = include if "all" in industries: return True return industry in industries def _matches_company_size(applicable_company_size_raw: Any, company_size: str) -> bool: """Check if a control's applicable_company_size matches the requested size.""" sizes = _parse_json_text(applicable_company_size_raw) if sizes is None: return True # NULL = applies to everyone if not isinstance(sizes, list): return True # malformed = include if "all" in sizes: return True return company_size in sizes def _matches_scope_signals( scope_conditions_raw: Any, scope_signals: list[str] ) -> bool: """Check if a control's scope_conditions are satisfied by the given signals. A control with scope_conditions = {"requires_any": ["uses_ai", "processes_health_data"]} matches if the company has at least one of those signals. A control with NULL or empty scope_conditions always matches. """ conditions = _parse_json_text(scope_conditions_raw) if conditions is None: return True # no conditions = applies to everyone if not isinstance(conditions, dict): return True # malformed = include requires_any = conditions.get("requires_any", []) if not requires_any: return True # no required signals = applies to everyone # Company must have at least one of the required signals return bool(set(requires_any) & set(scope_signals)) def get_applicable_controls( db, industry: Optional[str] = None, company_size: Optional[str] = None, scope_signals: Optional[list[str]] = None, limit: int = 100, offset: int = 0, ) -> dict[str, Any]: """ Returns controls applicable to the given company profile. Uses SQL pre-filtering with LIKE for performance, then Python post-filtering for precise JSON matching (since columns are TEXT, not JSONB). Args: db: SQLAlchemy session industry: e.g. "Telekommunikation", "Energie", "Gesundheitswesen" company_size: e.g. "medium", "large", "enterprise" scope_signals: e.g. ["uses_ai", "third_country_transfer"] limit: max results to return (applied after filtering) offset: pagination offset (applied after filtering) Returns: dict with total_applicable count, paginated controls, and breakdown stats """ if scope_signals is None: scope_signals = [] # SQL pre-filter: broad match to reduce Python-side filtering query = """ SELECT id, framework_id, control_id, title, objective, rationale, scope, requirements, test_procedure, evidence, severity, risk_score, implementation_effort, evidence_confidence, open_anchors, release_state, tags, license_rule, source_original_text, source_citation, customer_visible, verification_method, category, evidence_type, target_audience, generation_metadata, generation_strategy, applicable_industries, applicable_company_size, scope_conditions, parent_control_uuid, decomposition_method, pipeline_version, created_at, updated_at FROM canonical_controls WHERE release_state NOT IN ('duplicate', 'deprecated', 'rejected') """ params: dict[str, Any] = {} # SQL-level pre-filtering (broad, may include false positives) if industry: query += """ AND (applicable_industries IS NULL OR applicable_industries LIKE '%"all"%' OR applicable_industries LIKE '%' || :industry || '%')""" params["industry"] = industry if company_size: query += """ AND (applicable_company_size IS NULL OR applicable_company_size LIKE '%"all"%' OR applicable_company_size LIKE '%' || :company_size || '%')""" params["company_size"] = company_size # For scope_signals we cannot do precise SQL filtering on requires_any, # but we can at least exclude controls whose scope_conditions text # does not contain any of the requested signals (if only 1 signal). # With multiple signals we skip SQL pre-filter and do it in Python. if scope_signals and len(scope_signals) == 1: query += """ AND (scope_conditions IS NULL OR scope_conditions LIKE '%' || :scope_sig || '%')""" params["scope_sig"] = scope_signals[0] query += " ORDER BY control_id" rows = db.execute(text(query), params).fetchall() # Python-level precise filtering applicable = [] for r in rows: if industry and not _matches_industry(r.applicable_industries, industry): continue if company_size and not _matches_company_size( r.applicable_company_size, company_size ): continue if scope_signals and not _matches_scope_signals( r.scope_conditions, scope_signals ): continue applicable.append(r) total_applicable = len(applicable) # Apply pagination paginated = applicable[offset : offset + limit] # Build domain breakdown domain_counts: dict[str, int] = {} for r in applicable: domain = r.control_id.split("-")[0].upper() if r.control_id else "UNKNOWN" domain_counts[domain] = domain_counts.get(domain, 0) + 1 # Build severity breakdown severity_counts: dict[str, int] = {} for r in applicable: sev = r.severity or "unknown" severity_counts[sev] = severity_counts.get(sev, 0) + 1 # Build industry breakdown (from matched controls) industry_counts: dict[str, int] = {} for r in applicable: industries = _parse_json_text(r.applicable_industries) if isinstance(industries, list): for ind in industries: industry_counts[ind] = industry_counts.get(ind, 0) + 1 else: industry_counts["unclassified"] = ( industry_counts.get("unclassified", 0) + 1 ) # Assessment layer assessment = _assess( industry=industry, company_size=company_size, scope_signals=scope_signals, total_applicable=total_applicable, applicable_controls=applicable, ) return { "total_applicable": total_applicable, "limit": limit, "offset": offset, "controls": [_row_to_control(r) for r in paginated], "breakdown": { "by_domain": domain_counts, "by_severity": severity_counts, "by_industry": industry_counts, }, "assessment": asdict(assessment), } def _row_to_control(r) -> dict[str, Any]: """Convert a DB row to a control dict for API response.""" return { "id": str(r.id), "framework_id": str(r.framework_id), "control_id": r.control_id, "title": r.title, "objective": r.objective, "rationale": r.rationale, "severity": r.severity, "category": r.category, "verification_method": r.verification_method, "evidence_type": getattr(r, "evidence_type", None), "target_audience": r.target_audience, "applicable_industries": r.applicable_industries, "applicable_company_size": r.applicable_company_size, "scope_conditions": r.scope_conditions, "release_state": r.release_state, "control_id_domain": ( r.control_id.split("-")[0].upper() if r.control_id else None ), "created_at": r.created_at.isoformat() if r.created_at else None, "updated_at": r.updated_at.isoformat() if r.updated_at else None, } # ============================================================================= # Assessment Layer — Confidence, Escalation, Reasoning # ============================================================================= def _assess( industry: Optional[str], company_size: Optional[str], scope_signals: Optional[list[str]], total_applicable: int, applicable_controls: list, ) -> AssessmentResult: """Compute assessment result from filter inputs and outputs. Deterministic scoring — no LLM needed. """ signals = scope_signals or [] result = AssessmentResult(inferred_signals=list(signals)) warnings = [] # --- Confidence scoring --- score = 0.0 # Industry specified? (+0.25) if industry: score += 0.25 else: warnings.append("Keine Branche angegeben — alle Controls werden angezeigt") # Company size specified? (+0.15) if company_size: score += 0.15 else: warnings.append("Keine Unternehmensgroesse angegeben") # Scope signals provided? (+0.20 if any, +0.30 if >=3) if len(signals) >= 3: score += 0.30 elif len(signals) >= 1: score += 0.20 else: warnings.append("Keine Scope-Signale angegeben — Filterung nur nach Branche/Groesse") # Controls found? (+0.15 if >5, +0.05 if 1-5) if total_applicable > 5: score += 0.15 elif total_applicable > 0: score += 0.05 # No contradictions? (+0.15) contradictions = _detect_contradictions(signals) if not contradictions: score += 0.15 else: for c in contradictions: warnings.append(f"Widerspruch: {c}") result.confidence = round(min(score, 1.0), 2) # --- Escalation detection --- escalation_reasons = [] # Rule 1: Contradictory signals if contradictions: escalation_reasons.append( f"Widersprüchliche Angaben: {'; '.join(contradictions)}" ) # Rule 2: Ambiguous signals present active_ambiguous = set(signals) & _AMBIGUOUS_SIGNALS if active_ambiguous: escalation_reasons.append( f"Mehrdeutige Signale erfordern vertiefte Prüfung: {', '.join(sorted(active_ambiguous))}" ) # Rule 3: Financial signals without explicit payment service declaration active_financial = set(signals) & _FINANCIAL_SIGNALS if active_financial and "operates_payment_service" not in signals: if any(s in signals for s in ("holds_client_funds", "performs_kyc", "monitors_transactions")): escalation_reasons.append( "Finanznahe Signale ohne explizite Angabe zu Zahlungsdienst-Status — " "regulatorische Einordnung (PSD2/ZAG) vertieft prüfen" ) # Rule 4: Very few inputs → low confidence if not industry and not company_size and not signals: escalation_reasons.append( "Unvollständiges Profil — keine Branche, Größe oder Scope-Signale angegeben" ) if escalation_reasons: result.escalation_flag = True result.escalation_reason = " | ".join(escalation_reasons) # Cap confidence for escalation cases result.confidence = min(result.confidence, 0.75) # --- Reasoning --- reasoning_parts = [] if industry: reasoning_parts.append(f"Branche: {industry}") if company_size: reasoning_parts.append(f"Unternehmensgröße: {company_size}") if signals: reasoning_parts.append(f"Aktive Scope-Signale: {', '.join(sorted(signals))}") reasoning_parts.append(f"{total_applicable} Controls zugewiesen") if total_applicable > 0: # Collect unique source regulations from controls sources = set() for r in applicable_controls[:500]: sc = _parse_json_text(getattr(r, "scope_conditions", None)) if isinstance(sc, dict) and sc.get("requires_any"): for sig in sc["requires_any"]: if sig in signals: desc = sc.get("description", "") if desc: sources.add(desc) if sources: reasoning_parts.append( f"Scope-bedingte Controls: {'; '.join(sorted(sources)[:5])}" ) if warnings: reasoning_parts.append(f"Hinweise: {'; '.join(warnings)}") if result.escalation_flag: reasoning_parts.append(f"ESKALATION: {result.escalation_reason}") result.reasoning = ". ".join(reasoning_parts) + "." result.warnings = warnings return result def _detect_contradictions(signals: list[str]) -> list[str]: """Detect contradictory signal pairs.""" contradictions = [] signal_set = set(signals) # holds_client_funds but NOT operates_payment_service if "holds_client_funds" in signal_set and "operates_payment_service" not in signal_set: contradictions.append( "holds_client_funds=true aber operates_payment_service nicht gesetzt — " "unklar ob regulierter Zahlungsdienst" ) # performs_kyc but NOT operates_payment_service and NOT marketplace_model if ("performs_kyc" in signal_set and "operates_payment_service" not in signal_set and "marketplace_model" not in signal_set): contradictions.append( "performs_kyc=true ohne Payment- oder Marktplatz-Kontext — " "regulatorische Grundlage für KYC unklar" ) return contradictions