From a4b405077fd767a18439daf5f653254adadf165b Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Sat, 13 Jun 2026 21:37:18 +0200 Subject: [PATCH] feat(controls): shared get_controls_for_use_case retrieval API Read-only layer (service + thin route + tests) that returns the controls mapped to a use-case/topic, ranked by a deterministic precision proxy (is_primary + mapping confidence + registry keyword relevance) over the existing mc_use_case_mappings seed. No schema change. Shared handoff point: the document specialist agents AND the CRA finding-mapper draw from this one controls index instead of separate retrievals. Co-Authored-By: Claude Opus 4.7 --- backend-compliance/compliance/api/__init__.py | 1 + .../api/use_case_controls_routes.py | 49 ++++++ .../compliance/services/use_case_controls.py | 161 ++++++++++++++++++ .../tests/test_use_case_controls.py | 57 +++++++ 4 files changed, 268 insertions(+) create mode 100644 backend-compliance/compliance/api/use_case_controls_routes.py create mode 100644 backend-compliance/compliance/services/use_case_controls.py create mode 100644 backend-compliance/compliance/tests/test_use_case_controls.py diff --git a/backend-compliance/compliance/api/__init__.py b/backend-compliance/compliance/api/__init__.py index 525ace22..89cc1ccd 100644 --- a/backend-compliance/compliance/api/__init__.py +++ b/backend-compliance/compliance/api/__init__.py @@ -58,6 +58,7 @@ _ROUTER_MODULES = [ "canonical_control_routes", "control_generator_routes", "crosswalk_routes", + "use_case_controls_routes", "process_task_routes", "evidence_check_routes", "vvt_library_routes", diff --git a/backend-compliance/compliance/api/use_case_controls_routes.py b/backend-compliance/compliance/api/use_case_controls_routes.py new file mode 100644 index 00000000..48dec01f --- /dev/null +++ b/backend-compliance/compliance/api/use_case_controls_routes.py @@ -0,0 +1,49 @@ +"""Use-Case → Controls API — the shared retrieval layer. + + GET /v1/controls/use-cases — registry + mapped counts + GET /v1/controls/use-cases/{use_case}/controls — ranked controls of a topic + +Consumed by the document specialist agents and the CRA finding-mapper so both +draw from ONE controls index instead of separate retrievals. Read-only. +""" + +from __future__ import annotations + +from typing import Any + +from fastapi import APIRouter, Depends, Query +from sqlalchemy.orm import Session + +from classroom_engine.database import get_db +from compliance.api._http_errors import translate_domain_errors +from compliance.services.use_case_controls import UseCaseControlsService + +router = APIRouter(prefix="/v1/controls", tags=["use-case-controls"]) + + +def get_use_case_controls_service( + db: Session = Depends(get_db), +) -> UseCaseControlsService: + return UseCaseControlsService(db) + + +@router.get("/use-cases") +async def list_use_cases( + svc: UseCaseControlsService = Depends(get_use_case_controls_service), +) -> list[dict[str, Any]]: + """All enabled use-cases (topics) with their live mapped-control counts.""" + with translate_domain_errors(): + return svc.list_use_cases() + + +@router.get("/use-cases/{use_case}/controls") +async def controls_for_use_case( + use_case: str, + primary_only: bool = Query(False, description="Nur Primaerzweck-Mappings"), + limit: int = Query(50, ge=1, le=200), + offset: int = Query(0, ge=0), + svc: UseCaseControlsService = Depends(get_use_case_controls_service), +) -> dict[str, Any]: + """Controls mapped to a topic, ranked by the deterministic precision proxy.""" + with translate_domain_errors(): + return svc.controls_for_use_case(use_case, primary_only, limit, offset) diff --git a/backend-compliance/compliance/services/use_case_controls.py b/backend-compliance/compliance/services/use_case_controls.py new file mode 100644 index 00000000..ad2c2b2f --- /dev/null +++ b/backend-compliance/compliance/services/use_case_controls.py @@ -0,0 +1,161 @@ +# mypy: disable-error-code="no-any-return,arg-type" +"""Use-Case → Controls retrieval — the SHARED layer the document agents AND the +CRA finding-mapper query to pull the controls that belong to a topic. + +Read-only over the existing ``mc_use_case_mappings`` seed (no schema change). +The seed is recall-oriented ("this MC comes from a law about the topic"); the +ranking here is a deterministic *precision proxy* — is_primary + mapping +confidence + cluster size, plus a keyword-relevance score derived from the +use-case registry. The LLM precision pass (Phase B) refines this later; the +ranking field stays the same so consumers do not change. +""" + +from __future__ import annotations + +from typing import Any, Optional + +from sqlalchemy import text +from sqlalchemy.orm import Session + +from compliance.data.use_case_registry import REGISTRY, is_valid_use_case +from compliance.domain import NotFoundError + + +def relevance_score( + title: Optional[str], + objective: Optional[str], + keyword_tokens: tuple[str, ...], + is_primary: Optional[bool], + confidence: Optional[float], +) -> float: + """Deterministic precision proxy in [0, 1]. Pure → unit-testable. + + Combines the recall signals already on the mapping (primary flag, mapping + confidence) with a content signal: how many of the use-case's registry + keyword tokens appear in the control's own representative text. The content + term is what separates "actually about this topic" from "merely from a + related law" — the core of the precision problem. + """ + haystack = f"{title or ''} {objective or ''}".lower() + hits = sum(1 for kw in keyword_tokens if kw and kw in haystack) + kw_score = min(hits / 3.0, 1.0) if keyword_tokens else 0.0 + score = (0.5 if is_primary else 0.0) + 0.3 * float(confidence or 0.0) + 0.2 * kw_score + return round(min(score, 1.0), 3) + + +# Representative member (most severe, then lowest control_id) carries the +# human-readable title/objective — master_controls.canonical_name is only the +# merge token, so we surface a real member control per master. +_LIST_SQL = text(""" + SELECT mc.id, mc.master_control_id, mc.canonical_name, mc.total_controls, + m.is_primary, m.confidence, + (SELECT r.source_regulation FROM mc_regulations r + WHERE r.master_control_uuid = mc.id AND r.is_primary LIMIT 1) + AS primary_regulation, + rep.title, rep.objective, rep.severity, rep.category + FROM master_controls mc + JOIN mc_use_case_mappings m + ON m.master_control_uuid = mc.id AND m.use_case = :uc + LEFT JOIN LATERAL ( + SELECT cc.title, cc.objective, cc.severity, cc.category + FROM master_control_members mcm + JOIN canonical_controls cc ON cc.id = mcm.control_uuid + WHERE mcm.master_control_uuid = mc.id + ORDER BY CASE cc.severity WHEN 'critical' THEN 0 WHEN 'high' THEN 1 + WHEN 'medium' THEN 2 ELSE 3 END, cc.control_id + LIMIT 1 + ) rep ON true + WHERE (:primary_only = false OR m.is_primary) + ORDER BY m.is_primary DESC, m.confidence DESC NULLS LAST, + mc.total_controls DESC + LIMIT :lim OFFSET :off +""") + + +class UseCaseControlsService: + """Topic → controls retrieval over the seeded use-case mappings.""" + + def __init__(self, db: Session) -> None: + self.db = db + + def list_use_cases(self) -> list[dict[str, Any]]: + """Registry use-cases with their live mapped-control counts.""" + counts = { + row[0]: int(row[1]) + for row in self.db.execute(text( + "SELECT use_case, count(*) FROM mc_use_case_mappings " + "GROUP BY use_case" + )).fetchall() + } + out = [ + { + "key": uc.key, + "label": uc.label, + "group": uc.group, + "regulations": list(uc.regulations), + "verification_methods": list(uc.verification_methods), + "mapped_controls": counts.get(uc.key, 0), + } + for uc in REGISTRY.values() if uc.enabled + ] + out.sort(key=lambda x: x["mapped_controls"], reverse=True) + return out + + def controls_for_use_case( + self, + use_case: str, + primary_only: bool = False, + limit: int = 50, + offset: int = 0, + ) -> dict[str, Any]: + """Ranked controls mapped to ``use_case`` (deduplicated master grain).""" + if not is_valid_use_case(use_case): + raise NotFoundError(f"Unknown use_case '{use_case}'") + uc = REGISTRY[use_case] + lim = min(max(int(limit), 1), 200) + off = max(int(offset), 0) + + count_sql = ( + "SELECT count(*) FROM mc_use_case_mappings WHERE use_case = :uc" + + (" AND is_primary" if primary_only else "") + ) + total = self.db.execute(text(count_sql), {"uc": use_case}).scalar() or 0 + + rows = self.db.execute(_LIST_SQL, { + "uc": use_case, + "primary_only": bool(primary_only), + "lim": lim, + "off": off, + }).fetchall() + + controls = [ + { + "id": str(r.id), + "master_control_id": r.master_control_id, + "title": r.title or r.canonical_name, + "objective": r.objective, + "severity": r.severity, + "category": r.category, + "member_count": r.total_controls, + "is_primary": bool(r.is_primary), + "confidence": ( + float(r.confidence) if r.confidence is not None else None + ), + "primary_regulation": r.primary_regulation, + "relevance": relevance_score( + r.title, r.objective, uc.keyword_tokens, + r.is_primary, r.confidence, + ), + } + for r in rows + ] + return { + "use_case": uc.key, + "label": uc.label, + "group": uc.group, + "total": int(total), + "limit": lim, + "offset": off, + "primary_only": bool(primary_only), + "controls": controls, + } diff --git a/backend-compliance/compliance/tests/test_use_case_controls.py b/backend-compliance/compliance/tests/test_use_case_controls.py new file mode 100644 index 00000000..c4ff9560 --- /dev/null +++ b/backend-compliance/compliance/tests/test_use_case_controls.py @@ -0,0 +1,57 @@ +"""Tests for the shared use-case → controls retrieval layer. + +The SQL paths are verified e2e against the seeded DB; here we pin the pure, +deterministic ranking logic and the validation guard. +""" + +import pytest + +from compliance.domain import NotFoundError +from compliance.services.use_case_controls import ( + UseCaseControlsService, + relevance_score, +) + +_NET_KW = ("firewall", "tls", "port", "segmentation", "network", "header") + + +def test_relevance_primary_only_baseline(): + # primary flag alone (no confidence, no keyword hit) → 0.5 + assert relevance_score("x", "y", _NET_KW, True, None) == 0.5 + + +def test_relevance_non_primary_baseline_is_zero(): + assert relevance_score("x", "y", _NET_KW, False, None) == 0.0 + + +def test_relevance_confidence_contributes(): + # non-primary, no keyword: 0.3 * confidence + assert relevance_score("x", "y", _NET_KW, False, 1.0) == 0.3 + assert relevance_score("x", "y", _NET_KW, False, 0.5) == 0.15 + + +def test_relevance_keyword_hits_are_capped_at_three(): + # three+ distinct keyword hits saturate the content term at +0.2 + title = "Firewall and TLS on every port and network segmentation header" + assert relevance_score(title, "", _NET_KW, False, None) == 0.2 + + +def test_relevance_keyword_match_is_case_insensitive_over_title_and_objective(): + score = relevance_score("FIREWALL", "tls config", _NET_KW, False, None) + # two hits → 2/3 * 0.2 ≈ 0.133 + assert score == pytest.approx(0.133, abs=0.001) + + +def test_relevance_is_clamped_to_one(): + title = "firewall tls port" # 3 hits → +0.2 + assert relevance_score(title, "", _NET_KW, True, 1.0) == 1.0 + + +def test_relevance_no_keyword_tokens_yields_zero_content_term(): + assert relevance_score("anything", "here", (), True, 1.0) == 0.8 + + +def test_controls_for_unknown_use_case_raises_not_found(): + svc = UseCaseControlsService(db=None) # guard runs before any DB access + with pytest.raises(NotFoundError): + svc.controls_for_use_case("does_not_exist")