Files
breakpilot-compliance/backend-compliance/compliance/api/quaidal_routes.py
T
Benjamin Admin e536247c20 feat(quaidal): backend API + frontend tab for BSI QUAIDAL data-quality controls
Wire the 195 Clean-Room QUAIDAL controls (from breakpilot-core migration 011)
into the compliance SaaS UI.

Backend:
- GET /api/v1/quaidal/stats           - counts by kind + source provenance
- GET /api/v1/quaidal/controls        - list, optional kind= filter
- GET /api/v1/quaidal/controls/{id}   - single derived control
- GET /api/v1/quaidal/criteria        - 10 QKB criteria
- GET /api/v1/quaidal/criteria/{id}   - QKB with QB/MA/QM tree

Frontend:
- /sdk/quality: new "Trainingsdaten-Qualität (BSI QUAIDAL)" tab with
  10 QKB cards and a drill-down modal showing the full QB→MA→QM tree
  plus original BSI source link and license note.
- /sdk/ai-act: Art. 10 tile on each high-risk/unacceptable result,
  linking to /sdk/quality?category=data_quality.

Pattern matches existing IACE module DIN-reference handling:
own wording, source section + URL preserved for due diligence.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 13:03:54 +02:00

245 lines
9.3 KiB
Python

"""FastAPI routes for QUAIDAL-derived Controls (AI Trainingsdaten-Qualität).
Endpoints:
- GET /v1/quaidal/stats - Counts by kind + source provenance
- GET /v1/quaidal/controls - List all controls, optional kind= filter
- GET /v1/quaidal/controls/{id} - Single derived control by derived_id
- GET /v1/quaidal/criteria - The 10 QKB criteria with linked QB/MA IDs
- GET /v1/quaidal/criteria/{id} - Single QKB with full child tree (QB → MA → QM)
The controls are Clean-Room derived from BSI QUAIDAL. See
control-pipeline/scripts/derive_quaidal_mcs.py and migration 011.
"""
from __future__ import annotations
import logging
from typing import Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import text
from database import SessionLocal
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/v1/quaidal", tags=["quaidal"])
# ---------------------------------------------------------------------------
# Response shapes
# ---------------------------------------------------------------------------
class ExternalRef(BaseModel):
framework: str
citation: Optional[str] = None
class SourceProvenance(BaseModel):
framework: str
section: str
url: Optional[str] = None
commit_sha: Optional[str] = None
title_original: Optional[str] = None
license_note: Optional[str] = None
class DerivedControl(BaseModel):
derived_id: str
kind: str
canonical_name: str
description: str
regulation_anchor: Optional[str] = None
related_quaidal_ids: list[str]
external_refs: list[ExternalRef]
source: SourceProvenance
plagiarism_score: Optional[float] = None
class ControlsListResponse(BaseModel):
total: int
controls: list[DerivedControl]
class CriterionWithChildren(BaseModel):
"""A QKB criterion with the IDs of its linked building blocks, measures and metrics."""
criterion: DerivedControl
building_blocks: list[DerivedControl]
measures: list[DerivedControl]
metrics: list[DerivedControl]
class StatsResponse(BaseModel):
counts_by_kind: dict[str, int]
source_framework: str
source_commit_sha: Optional[str]
license_note: Optional[str]
# ---------------------------------------------------------------------------
# DB helpers
# ---------------------------------------------------------------------------
def _row_to_control(row) -> DerivedControl:
return DerivedControl(
derived_id=row.derived_id,
kind=row.kind,
canonical_name=row.canonical_name,
description=row.description,
regulation_anchor=row.regulation_anchor,
related_quaidal_ids=row.related_quaidal_ids or [],
external_refs=[ExternalRef(**r) for r in (row.external_refs or [])],
source=SourceProvenance(
framework=row.source_framework,
section=row.source_section,
url=row.source_url,
commit_sha=row.source_commit_sha,
title_original=row.source_title_original,
license_note=row.source_license_note,
),
plagiarism_score=float(row.plagiarism_score_at_generation) if row.plagiarism_score_at_generation is not None else None,
)
_SELECT_COLUMNS = """
derived_id, kind, canonical_name, description, regulation_anchor,
related_quaidal_ids, external_refs,
source_framework, source_section, source_url, source_commit_sha,
source_title_original, source_license_note,
plagiarism_score_at_generation
"""
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@router.get("/stats", response_model=StatsResponse)
def get_stats() -> StatsResponse:
"""Counts by kind + the QUAIDAL source provenance (single source today)."""
with SessionLocal() as db:
counts = db.execute(text(
"SELECT kind, COUNT(*) AS n FROM compliance.derived_controls "
"WHERE source_framework = :fw GROUP BY kind"
), {"fw": "BSI QUAIDAL"}).all()
meta = db.execute(text(
"SELECT source_commit_sha, source_license_note FROM compliance.derived_controls "
"WHERE source_framework = :fw LIMIT 1"
), {"fw": "BSI QUAIDAL"}).first()
return StatsResponse(
counts_by_kind={r.kind: r.n for r in counts},
source_framework="BSI QUAIDAL",
source_commit_sha=meta.source_commit_sha if meta else None,
license_note=meta.source_license_note if meta else None,
)
@router.get("/controls", response_model=ControlsListResponse)
def list_controls(
kind: Optional[str] = Query(None, description="criterion | building_block | measure | metric"),
limit: int = Query(500, ge=1, le=2000),
offset: int = Query(0, ge=0),
) -> ControlsListResponse:
"""List QUAIDAL-derived controls, optionally filtered by kind."""
where = ["source_framework = :fw"]
params: dict = {"fw": "BSI QUAIDAL", "limit": limit, "offset": offset}
if kind:
where.append("kind = :kind")
params["kind"] = kind
sql = (
f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls "
f"WHERE {' AND '.join(where)} "
"ORDER BY source_section LIMIT :limit OFFSET :offset"
)
count_sql = f"SELECT COUNT(*) FROM compliance.derived_controls WHERE {' AND '.join(where)}"
with SessionLocal() as db:
rows = db.execute(text(sql), params).all()
total = db.execute(text(count_sql), {k: v for k, v in params.items() if k not in ("limit", "offset")}).scalar() or 0
return ControlsListResponse(total=int(total), controls=[_row_to_control(r) for r in rows])
@router.get("/controls/{derived_id}", response_model=DerivedControl)
def get_control(derived_id: str) -> DerivedControl:
with SessionLocal() as db:
row = db.execute(text(
f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls WHERE derived_id = :id"
), {"id": derived_id}).first()
if not row:
raise HTTPException(status_code=404, detail=f"Control {derived_id} not found")
return _row_to_control(row)
@router.get("/criteria", response_model=list[DerivedControl])
def list_criteria() -> list[DerivedControl]:
"""Returns the 10 QKB criteria. Use /criteria/{section_id} for the full child tree."""
with SessionLocal() as db:
rows = db.execute(text(
f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls "
"WHERE source_framework = :fw AND kind = 'criterion' ORDER BY source_section"
), {"fw": "BSI QUAIDAL"}).all()
return [_row_to_control(r) for r in rows]
@router.get("/criteria/{section_id}", response_model=CriterionWithChildren)
def get_criterion_tree(section_id: str) -> CriterionWithChildren:
"""Single QKB with the building blocks it references and the measures/metrics those reference.
`section_id` is the canonical QUAIDAL ID, e.g. `QKB-01`.
"""
section_id_upper = section_id.upper()
with SessionLocal() as db:
criterion_row = db.execute(text(
f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls "
"WHERE source_framework = :fw AND source_section = :sid AND kind = 'criterion'"
), {"fw": "BSI QUAIDAL", "sid": section_id_upper}).first()
if not criterion_row:
raise HTTPException(status_code=404, detail=f"Criterion {section_id_upper} not found")
building_block_ids = criterion_row.related_quaidal_ids or []
building_blocks = []
if building_block_ids:
qb_rows = db.execute(text(
f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls "
"WHERE source_framework = :fw AND kind = 'building_block' "
"AND source_section = ANY(:ids) ORDER BY source_section"
), {"fw": "BSI QUAIDAL", "ids": building_block_ids}).all()
building_blocks = [_row_to_control(r) for r in qb_rows]
# Collect measure IDs from each building block, then fetch them
measure_ids: list[str] = []
for qb in building_blocks:
measure_ids.extend(mid for mid in qb.related_quaidal_ids if mid.startswith("MA-"))
measures = []
if measure_ids:
ma_rows = db.execute(text(
f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls "
"WHERE source_framework = :fw AND kind = 'measure' "
"AND source_section = ANY(:ids) ORDER BY source_section"
), {"fw": "BSI QUAIDAL", "ids": list(set(measure_ids))}).all()
measures = [_row_to_control(r) for r in ma_rows]
# Collect metric IDs from each measure
metric_ids: list[str] = []
for ma in measures:
metric_ids.extend(mid for mid in ma.related_quaidal_ids if mid.startswith("QM-"))
metrics = []
if metric_ids:
qm_rows = db.execute(text(
f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls "
"WHERE source_framework = :fw AND kind = 'metric' "
"AND source_section = ANY(:ids) ORDER BY source_section"
), {"fw": "BSI QUAIDAL", "ids": list(set(metric_ids))}).all()
metrics = [_row_to_control(r) for r in qm_rows]
return CriterionWithChildren(
criterion=_row_to_control(criterion_row),
building_blocks=building_blocks,
measures=measures,
metrics=metrics,
)