"""FastAPI routes for QUAIDAL-derived Controls (AI Trainingsdaten-Qualität). Endpoints: - GET /v1/quaidal/stats - Counts by kind + source provenance - GET /v1/quaidal/controls - List all controls, optional kind= filter - GET /v1/quaidal/controls/{id} - Single derived control by derived_id - GET /v1/quaidal/criteria - The 10 QKB criteria with linked QB/MA IDs - GET /v1/quaidal/criteria/{id} - Single QKB with full child tree (QB → MA → QM) The controls are Clean-Room derived from BSI QUAIDAL. See control-pipeline/scripts/derive_quaidal_mcs.py and migration 011. """ from __future__ import annotations import logging from typing import Optional from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel from sqlalchemy import text from database import SessionLocal logger = logging.getLogger(__name__) router = APIRouter(prefix="/v1/quaidal", tags=["quaidal"]) # --------------------------------------------------------------------------- # Response shapes # --------------------------------------------------------------------------- class ExternalRef(BaseModel): framework: str citation: Optional[str] = None class SourceProvenance(BaseModel): framework: str section: str url: Optional[str] = None commit_sha: Optional[str] = None title_original: Optional[str] = None license_note: Optional[str] = None class DerivedControl(BaseModel): derived_id: str kind: str canonical_name: str description: str regulation_anchor: Optional[str] = None related_quaidal_ids: list[str] external_refs: list[ExternalRef] source: SourceProvenance plagiarism_score: Optional[float] = None class ControlsListResponse(BaseModel): total: int controls: list[DerivedControl] class CriterionWithChildren(BaseModel): """A QKB criterion with the IDs of its linked building blocks, measures and metrics.""" criterion: DerivedControl building_blocks: list[DerivedControl] measures: list[DerivedControl] metrics: list[DerivedControl] class StatsResponse(BaseModel): counts_by_kind: dict[str, int] source_framework: str source_commit_sha: Optional[str] license_note: Optional[str] # --------------------------------------------------------------------------- # DB helpers # --------------------------------------------------------------------------- def _row_to_control(row) -> DerivedControl: return DerivedControl( derived_id=row.derived_id, kind=row.kind, canonical_name=row.canonical_name, description=row.description, regulation_anchor=row.regulation_anchor, related_quaidal_ids=row.related_quaidal_ids or [], external_refs=[ExternalRef(**r) for r in (row.external_refs or [])], source=SourceProvenance( framework=row.source_framework, section=row.source_section, url=row.source_url, commit_sha=row.source_commit_sha, title_original=row.source_title_original, license_note=row.source_license_note, ), plagiarism_score=float(row.plagiarism_score_at_generation) if row.plagiarism_score_at_generation is not None else None, ) _SELECT_COLUMNS = """ derived_id, kind, canonical_name, description, regulation_anchor, related_quaidal_ids, external_refs, source_framework, source_section, source_url, source_commit_sha, source_title_original, source_license_note, plagiarism_score_at_generation """ # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @router.get("/stats", response_model=StatsResponse) def get_stats() -> StatsResponse: """Counts by kind + the QUAIDAL source provenance (single source today).""" with SessionLocal() as db: counts = db.execute(text( "SELECT kind, COUNT(*) AS n FROM compliance.derived_controls " "WHERE source_framework = :fw GROUP BY kind" ), {"fw": "BSI QUAIDAL"}).all() meta = db.execute(text( "SELECT source_commit_sha, source_license_note FROM compliance.derived_controls " "WHERE source_framework = :fw LIMIT 1" ), {"fw": "BSI QUAIDAL"}).first() return StatsResponse( counts_by_kind={r.kind: r.n for r in counts}, source_framework="BSI QUAIDAL", source_commit_sha=meta.source_commit_sha if meta else None, license_note=meta.source_license_note if meta else None, ) @router.get("/controls", response_model=ControlsListResponse) def list_controls( kind: Optional[str] = Query(None, description="criterion | building_block | measure | metric"), limit: int = Query(500, ge=1, le=2000), offset: int = Query(0, ge=0), ) -> ControlsListResponse: """List QUAIDAL-derived controls, optionally filtered by kind.""" where = ["source_framework = :fw"] params: dict = {"fw": "BSI QUAIDAL", "limit": limit, "offset": offset} if kind: where.append("kind = :kind") params["kind"] = kind sql = ( f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls " f"WHERE {' AND '.join(where)} " "ORDER BY source_section LIMIT :limit OFFSET :offset" ) count_sql = f"SELECT COUNT(*) FROM compliance.derived_controls WHERE {' AND '.join(where)}" with SessionLocal() as db: rows = db.execute(text(sql), params).all() total = db.execute(text(count_sql), {k: v for k, v in params.items() if k not in ("limit", "offset")}).scalar() or 0 return ControlsListResponse(total=int(total), controls=[_row_to_control(r) for r in rows]) @router.get("/controls/{derived_id}", response_model=DerivedControl) def get_control(derived_id: str) -> DerivedControl: with SessionLocal() as db: row = db.execute(text( f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls WHERE derived_id = :id" ), {"id": derived_id}).first() if not row: raise HTTPException(status_code=404, detail=f"Control {derived_id} not found") return _row_to_control(row) @router.get("/criteria", response_model=list[DerivedControl]) def list_criteria() -> list[DerivedControl]: """Returns the 10 QKB criteria. Use /criteria/{section_id} for the full child tree.""" with SessionLocal() as db: rows = db.execute(text( f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls " "WHERE source_framework = :fw AND kind = 'criterion' ORDER BY source_section" ), {"fw": "BSI QUAIDAL"}).all() return [_row_to_control(r) for r in rows] @router.get("/criteria/{section_id}", response_model=CriterionWithChildren) def get_criterion_tree(section_id: str) -> CriterionWithChildren: """Single QKB with the building blocks it references and the measures/metrics those reference. `section_id` is the canonical QUAIDAL ID, e.g. `QKB-01`. """ section_id_upper = section_id.upper() with SessionLocal() as db: criterion_row = db.execute(text( f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls " "WHERE source_framework = :fw AND source_section = :sid AND kind = 'criterion'" ), {"fw": "BSI QUAIDAL", "sid": section_id_upper}).first() if not criterion_row: raise HTTPException(status_code=404, detail=f"Criterion {section_id_upper} not found") building_block_ids = criterion_row.related_quaidal_ids or [] building_blocks = [] if building_block_ids: qb_rows = db.execute(text( f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls " "WHERE source_framework = :fw AND kind = 'building_block' " "AND source_section = ANY(:ids) ORDER BY source_section" ), {"fw": "BSI QUAIDAL", "ids": building_block_ids}).all() building_blocks = [_row_to_control(r) for r in qb_rows] # Collect measure IDs from each building block, then fetch them measure_ids: list[str] = [] for qb in building_blocks: measure_ids.extend(mid for mid in qb.related_quaidal_ids if mid.startswith("MA-")) measures = [] if measure_ids: ma_rows = db.execute(text( f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls " "WHERE source_framework = :fw AND kind = 'measure' " "AND source_section = ANY(:ids) ORDER BY source_section" ), {"fw": "BSI QUAIDAL", "ids": list(set(measure_ids))}).all() measures = [_row_to_control(r) for r in ma_rows] # Collect metric IDs from each measure metric_ids: list[str] = [] for ma in measures: metric_ids.extend(mid for mid in ma.related_quaidal_ids if mid.startswith("QM-")) metrics = [] if metric_ids: qm_rows = db.execute(text( f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls " "WHERE source_framework = :fw AND kind = 'metric' " "AND source_section = ANY(:ids) ORDER BY source_section" ), {"fw": "BSI QUAIDAL", "ids": list(set(metric_ids))}).all() metrics = [_row_to_control(r) for r in qm_rows] return CriterionWithChildren( criterion=_row_to_control(criterion_row), building_blocks=building_blocks, measures=measures, metrics=metrics, )