# mypy: disable-error-code="no-any-return,arg-type" """Use-Case → Controls retrieval — the SHARED layer the document agents AND the CRA finding-mapper query to pull the controls that belong to a topic. Read-only over the existing ``mc_use_case_mappings`` seed (no schema change). The seed is recall-oriented ("this MC comes from a law about the topic"); the ranking here is a deterministic *precision proxy* — is_primary + mapping confidence + cluster size, plus a keyword-relevance score derived from the use-case registry. The LLM precision pass (Phase B) refines this later; the ranking field stays the same so consumers do not change. """ from __future__ import annotations from typing import Any, Optional from sqlalchemy import text from sqlalchemy.orm import Session from compliance.data.use_case_registry import REGISTRY, is_valid_use_case from compliance.domain import NotFoundError def relevance_score( title: Optional[str], objective: Optional[str], keyword_tokens: tuple[str, ...], is_primary: Optional[bool], confidence: Optional[float], ) -> float: """Deterministic precision proxy in [0, 1]. Pure → unit-testable. Combines the recall signals already on the mapping (primary flag, mapping confidence) with a content signal: how many of the use-case's registry keyword tokens appear in the control's own representative text. The content term is what separates "actually about this topic" from "merely from a related law" — the core of the precision problem. """ haystack = f"{title or ''} {objective or ''}".lower() hits = sum(1 for kw in keyword_tokens if kw and kw in haystack) kw_score = min(hits / 3.0, 1.0) if keyword_tokens else 0.0 score = (0.5 if is_primary else 0.0) + 0.3 * float(confidence or 0.0) + 0.2 * kw_score return round(min(score, 1.0), 3) # Representative member (most severe, then lowest control_id) carries the # human-readable title/objective — master_controls.canonical_name is only the # merge token, so we surface a real member control per master. _LIST_SQL = text(""" SELECT mc.id, mc.master_control_id, mc.canonical_name, mc.total_controls, m.is_primary, m.confidence, (SELECT r.source_regulation FROM mc_regulations r WHERE r.master_control_uuid = mc.id AND r.is_primary LIMIT 1) AS primary_regulation, rep.title, rep.objective, rep.severity, rep.category FROM master_controls mc JOIN mc_use_case_mappings m ON m.master_control_uuid = mc.id AND m.use_case = :uc LEFT JOIN LATERAL ( SELECT cc.title, cc.objective, cc.severity, cc.category FROM master_control_members mcm JOIN canonical_controls cc ON cc.id = mcm.control_uuid WHERE mcm.master_control_uuid = mc.id ORDER BY CASE cc.severity WHEN 'critical' THEN 0 WHEN 'high' THEN 1 WHEN 'medium' THEN 2 ELSE 3 END, cc.control_id LIMIT 1 ) rep ON true WHERE (:primary_only = false OR m.is_primary) ORDER BY m.is_primary DESC, m.confidence DESC NULLS LAST, mc.total_controls DESC LIMIT :lim OFFSET :off """) class UseCaseControlsService: """Topic → controls retrieval over the seeded use-case mappings.""" def __init__(self, db: Session) -> None: self.db = db def list_use_cases(self) -> list[dict[str, Any]]: """Registry use-cases with their live mapped-control counts.""" counts = { row[0]: int(row[1]) for row in self.db.execute(text( "SELECT use_case, count(*) FROM mc_use_case_mappings " "GROUP BY use_case" )).fetchall() } out = [ { "key": uc.key, "label": uc.label, "group": uc.group, "regulations": list(uc.regulations), "verification_methods": list(uc.verification_methods), "mapped_controls": counts.get(uc.key, 0), } for uc in REGISTRY.values() if uc.enabled ] out.sort(key=lambda x: x["mapped_controls"], reverse=True) return out def controls_for_use_case( self, use_case: str, primary_only: bool = False, limit: int = 50, offset: int = 0, ) -> dict[str, Any]: """Ranked controls mapped to ``use_case`` (deduplicated master grain).""" if not is_valid_use_case(use_case): raise NotFoundError(f"Unknown use_case '{use_case}'") uc = REGISTRY[use_case] lim = min(max(int(limit), 1), 200) off = max(int(offset), 0) count_sql = ( "SELECT count(*) FROM mc_use_case_mappings WHERE use_case = :uc" + (" AND is_primary" if primary_only else "") ) total = self.db.execute(text(count_sql), {"uc": use_case}).scalar() or 0 rows = self.db.execute(_LIST_SQL, { "uc": use_case, "primary_only": bool(primary_only), "lim": lim, "off": off, }).fetchall() controls = [ { "id": str(r.id), "master_control_id": r.master_control_id, "title": r.title or r.canonical_name, "objective": r.objective, "severity": r.severity, "category": r.category, "member_count": r.total_controls, "is_primary": bool(r.is_primary), "confidence": ( float(r.confidence) if r.confidence is not None else None ), "primary_regulation": r.primary_regulation, "relevance": relevance_score( r.title, r.objective, uc.keyword_tokens, r.is_primary, r.confidence, ), } for r in rows ] return { "use_case": uc.key, "label": uc.label, "group": uc.group, "total": int(total), "limit": lim, "offset": off, "primary_only": bool(primary_only), "controls": controls, }