From 4d01e99ca1954ef23ab1f2ce1c4e39fdb0142823 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Sun, 14 Jun 2026 09:47:49 +0200 Subject: [PATCH] feat(controls): atom-grain path in get_controls_for_use_case Reads compliance.atom_classification (Haiku pass: relevant + sub_topic + canonical_obligation) when present -> precise, sub-topic-organized controls per topic; master-grain seed stays as fallback for unprocessed topics. New optional sub_topic filter + subtopic_counts facet + granularity flag in the response. Co-Authored-By: Claude Opus 4.7 --- .../api/use_case_controls_routes.py | 10 +- .../compliance/services/use_case_controls.py | 101 ++++++++++++++---- 2 files changed, 87 insertions(+), 24 deletions(-) diff --git a/backend-compliance/compliance/api/use_case_controls_routes.py b/backend-compliance/compliance/api/use_case_controls_routes.py index 48dec01f..1fef4411 100644 --- a/backend-compliance/compliance/api/use_case_controls_routes.py +++ b/backend-compliance/compliance/api/use_case_controls_routes.py @@ -9,7 +9,7 @@ draw from ONE controls index instead of separate retrievals. Read-only. from __future__ import annotations -from typing import Any +from typing import Any, Optional from fastapi import APIRouter, Depends, Query from sqlalchemy.orm import Session @@ -39,11 +39,13 @@ async def list_use_cases( @router.get("/use-cases/{use_case}/controls") async def controls_for_use_case( use_case: str, - primary_only: bool = Query(False, description="Nur Primaerzweck-Mappings"), + primary_only: bool = Query(False, description="master-grain Fallback: nur Primaerzweck"), + sub_topic: Optional[str] = Query(None, description="atom-grain: nur dieses Sub-Thema"), limit: int = Query(50, ge=1, le=200), offset: int = Query(0, ge=0), svc: UseCaseControlsService = Depends(get_use_case_controls_service), ) -> dict[str, Any]: - """Controls mapped to a topic, ranked by the deterministic precision proxy.""" + """Controls for a topic. Atom-grain (Haiku: relevant + sub_topic) wenn vorhanden, + sonst master-grain Seed.""" with translate_domain_errors(): - return svc.controls_for_use_case(use_case, primary_only, limit, offset) + return svc.controls_for_use_case(use_case, primary_only, limit, offset, sub_topic) diff --git a/backend-compliance/compliance/services/use_case_controls.py b/backend-compliance/compliance/services/use_case_controls.py index ad2c2b2f..32e21bb4 100644 --- a/backend-compliance/compliance/services/use_case_controls.py +++ b/backend-compliance/compliance/services/use_case_controls.py @@ -72,6 +72,25 @@ _LIST_SQL = text(""" """) +# Atom-grain path: the one-time Haiku classification (atom_classification) gives +# per-atom relevance + sub-topic. Far more precise + organized than the master +# seed. Preferred whenever the use-case has been processed. +_ATOM_LIST_SQL = text(""" + SELECT ac.control_uuid, ac.sub_topic, ac.canonical_obligation, + cc.control_id, cc.title, cc.objective, cc.severity, + (SELECT cpl.source_regulation FROM control_parent_links cpl + WHERE cpl.control_uuid = ac.control_uuid LIMIT 1) AS source_regulation + FROM atom_classification ac + JOIN canonical_controls cc ON cc.id = ac.control_uuid + WHERE ac.use_case = :uc AND ac.relevant = true + AND (:sub IS NULL OR ac.sub_topic = :sub) + ORDER BY ac.sub_topic NULLS LAST, + CASE cc.severity WHEN 'critical' THEN 0 WHEN 'high' THEN 1 + WHEN 'medium' THEN 2 ELSE 3 END, cc.title + LIMIT :lim OFFSET :off +""") + + class UseCaseControlsService: """Topic → controls retrieval over the seeded use-case mappings.""" @@ -107,27 +126,29 @@ class UseCaseControlsService: primary_only: bool = False, limit: int = 50, offset: int = 0, + sub_topic: Optional[str] = None, ) -> dict[str, Any]: - """Ranked controls mapped to ``use_case`` (deduplicated master grain).""" + """Controls for ``use_case``. Prefers the atom-grain Haiku classification + (precise + sub-topic-organized) when present; falls back to the + master-grain seed otherwise.""" if not is_valid_use_case(use_case): raise NotFoundError(f"Unknown use_case '{use_case}'") uc = REGISTRY[use_case] lim = min(max(int(limit), 1), 200) off = max(int(offset), 0) + if self._has_atom_grain(use_case): + return self._atom_grain(uc, lim, off, sub_topic) + + # --- master-grain fallback (recall seed) --- count_sql = ( "SELECT count(*) FROM mc_use_case_mappings WHERE use_case = :uc" + (" AND is_primary" if primary_only else "") ) total = self.db.execute(text(count_sql), {"uc": use_case}).scalar() or 0 - rows = self.db.execute(_LIST_SQL, { - "uc": use_case, - "primary_only": bool(primary_only), - "lim": lim, - "off": off, + "uc": use_case, "primary_only": bool(primary_only), "lim": lim, "off": off, }).fetchall() - controls = [ { "id": str(r.id), @@ -138,24 +159,64 @@ class UseCaseControlsService: "category": r.category, "member_count": r.total_controls, "is_primary": bool(r.is_primary), - "confidence": ( - float(r.confidence) if r.confidence is not None else None - ), + "confidence": float(r.confidence) if r.confidence is not None else None, "primary_regulation": r.primary_regulation, "relevance": relevance_score( - r.title, r.objective, uc.keyword_tokens, - r.is_primary, r.confidence, + r.title, r.objective, uc.keyword_tokens, r.is_primary, r.confidence, ), } for r in rows ] return { - "use_case": uc.key, - "label": uc.label, - "group": uc.group, - "total": int(total), - "limit": lim, - "offset": off, - "primary_only": bool(primary_only), - "controls": controls, + "use_case": uc.key, "label": uc.label, "group": uc.group, + "granularity": "master", "total": int(total), "limit": lim, "offset": off, + "primary_only": bool(primary_only), "controls": controls, + } + + def _has_atom_grain(self, use_case: str) -> bool: + if self.db.execute( + text("SELECT to_regclass('compliance.atom_classification')") + ).scalar() is None: + return False + return (self.db.execute( + text("SELECT count(*) FROM atom_classification WHERE use_case = :uc"), + {"uc": use_case}, + ).scalar() or 0) > 0 + + def _atom_grain( + self, uc, lim: int, off: int, sub_topic: Optional[str], + ) -> dict[str, Any]: + total = self.db.execute(text( + "SELECT count(*) FROM atom_classification " + "WHERE use_case = :uc AND relevant = true " + "AND (:sub IS NULL OR sub_topic = :sub)" + ), {"uc": uc.key, "sub": sub_topic}).scalar() or 0 + facet = { + row[0]: int(row[1]) + for row in self.db.execute(text( + "SELECT COALESCE(sub_topic, '(none)'), count(*) " + "FROM atom_classification WHERE use_case = :uc AND relevant = true " + "GROUP BY 1 ORDER BY 2 DESC" + ), {"uc": uc.key}).fetchall() + } + rows = self.db.execute(_ATOM_LIST_SQL, { + "uc": uc.key, "sub": sub_topic, "lim": lim, "off": off, + }).fetchall() + controls = [ + { + "id": str(r.control_uuid), + "control_id": r.control_id, + "title": r.title, + "objective": r.objective, + "severity": r.severity, + "sub_topic": r.sub_topic, + "canonical_obligation": r.canonical_obligation, + "source_regulation": r.source_regulation, + } + for r in rows + ] + return { + "use_case": uc.key, "label": uc.label, "group": uc.group, + "granularity": "atom", "total": int(total), "limit": lim, "offset": off, + "sub_topic": sub_topic, "subtopic_counts": facet, "controls": controls, }