e536247c20
Wire the 195 Clean-Room QUAIDAL controls (from breakpilot-core migration 011)
into the compliance SaaS UI.
Backend:
- GET /api/v1/quaidal/stats - counts by kind + source provenance
- GET /api/v1/quaidal/controls - list, optional kind= filter
- GET /api/v1/quaidal/controls/{id} - single derived control
- GET /api/v1/quaidal/criteria - 10 QKB criteria
- GET /api/v1/quaidal/criteria/{id} - QKB with QB/MA/QM tree
Frontend:
- /sdk/quality: new "Trainingsdaten-Qualität (BSI QUAIDAL)" tab with
10 QKB cards and a drill-down modal showing the full QB→MA→QM tree
plus original BSI source link and license note.
- /sdk/ai-act: Art. 10 tile on each high-risk/unacceptable result,
linking to /sdk/quality?category=data_quality.
Pattern matches existing IACE module DIN-reference handling:
own wording, source section + URL preserved for due diligence.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
245 lines
9.3 KiB
Python
245 lines
9.3 KiB
Python
"""FastAPI routes for QUAIDAL-derived Controls (AI Trainingsdaten-Qualität).
|
|
|
|
Endpoints:
|
|
- GET /v1/quaidal/stats - Counts by kind + source provenance
|
|
- GET /v1/quaidal/controls - List all controls, optional kind= filter
|
|
- GET /v1/quaidal/controls/{id} - Single derived control by derived_id
|
|
- GET /v1/quaidal/criteria - The 10 QKB criteria with linked QB/MA IDs
|
|
- GET /v1/quaidal/criteria/{id} - Single QKB with full child tree (QB → MA → QM)
|
|
|
|
The controls are Clean-Room derived from BSI QUAIDAL. See
|
|
control-pipeline/scripts/derive_quaidal_mcs.py and migration 011.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, HTTPException, Query
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import text
|
|
|
|
from database import SessionLocal
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/v1/quaidal", tags=["quaidal"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Response shapes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class ExternalRef(BaseModel):
|
|
framework: str
|
|
citation: Optional[str] = None
|
|
|
|
|
|
class SourceProvenance(BaseModel):
|
|
framework: str
|
|
section: str
|
|
url: Optional[str] = None
|
|
commit_sha: Optional[str] = None
|
|
title_original: Optional[str] = None
|
|
license_note: Optional[str] = None
|
|
|
|
|
|
class DerivedControl(BaseModel):
|
|
derived_id: str
|
|
kind: str
|
|
canonical_name: str
|
|
description: str
|
|
regulation_anchor: Optional[str] = None
|
|
related_quaidal_ids: list[str]
|
|
external_refs: list[ExternalRef]
|
|
source: SourceProvenance
|
|
plagiarism_score: Optional[float] = None
|
|
|
|
|
|
class ControlsListResponse(BaseModel):
|
|
total: int
|
|
controls: list[DerivedControl]
|
|
|
|
|
|
class CriterionWithChildren(BaseModel):
|
|
"""A QKB criterion with the IDs of its linked building blocks, measures and metrics."""
|
|
criterion: DerivedControl
|
|
building_blocks: list[DerivedControl]
|
|
measures: list[DerivedControl]
|
|
metrics: list[DerivedControl]
|
|
|
|
|
|
class StatsResponse(BaseModel):
|
|
counts_by_kind: dict[str, int]
|
|
source_framework: str
|
|
source_commit_sha: Optional[str]
|
|
license_note: Optional[str]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DB helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _row_to_control(row) -> DerivedControl:
|
|
return DerivedControl(
|
|
derived_id=row.derived_id,
|
|
kind=row.kind,
|
|
canonical_name=row.canonical_name,
|
|
description=row.description,
|
|
regulation_anchor=row.regulation_anchor,
|
|
related_quaidal_ids=row.related_quaidal_ids or [],
|
|
external_refs=[ExternalRef(**r) for r in (row.external_refs or [])],
|
|
source=SourceProvenance(
|
|
framework=row.source_framework,
|
|
section=row.source_section,
|
|
url=row.source_url,
|
|
commit_sha=row.source_commit_sha,
|
|
title_original=row.source_title_original,
|
|
license_note=row.source_license_note,
|
|
),
|
|
plagiarism_score=float(row.plagiarism_score_at_generation) if row.plagiarism_score_at_generation is not None else None,
|
|
)
|
|
|
|
|
|
_SELECT_COLUMNS = """
|
|
derived_id, kind, canonical_name, description, regulation_anchor,
|
|
related_quaidal_ids, external_refs,
|
|
source_framework, source_section, source_url, source_commit_sha,
|
|
source_title_original, source_license_note,
|
|
plagiarism_score_at_generation
|
|
"""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/stats", response_model=StatsResponse)
|
|
def get_stats() -> StatsResponse:
|
|
"""Counts by kind + the QUAIDAL source provenance (single source today)."""
|
|
with SessionLocal() as db:
|
|
counts = db.execute(text(
|
|
"SELECT kind, COUNT(*) AS n FROM compliance.derived_controls "
|
|
"WHERE source_framework = :fw GROUP BY kind"
|
|
), {"fw": "BSI QUAIDAL"}).all()
|
|
meta = db.execute(text(
|
|
"SELECT source_commit_sha, source_license_note FROM compliance.derived_controls "
|
|
"WHERE source_framework = :fw LIMIT 1"
|
|
), {"fw": "BSI QUAIDAL"}).first()
|
|
return StatsResponse(
|
|
counts_by_kind={r.kind: r.n for r in counts},
|
|
source_framework="BSI QUAIDAL",
|
|
source_commit_sha=meta.source_commit_sha if meta else None,
|
|
license_note=meta.source_license_note if meta else None,
|
|
)
|
|
|
|
|
|
@router.get("/controls", response_model=ControlsListResponse)
|
|
def list_controls(
|
|
kind: Optional[str] = Query(None, description="criterion | building_block | measure | metric"),
|
|
limit: int = Query(500, ge=1, le=2000),
|
|
offset: int = Query(0, ge=0),
|
|
) -> ControlsListResponse:
|
|
"""List QUAIDAL-derived controls, optionally filtered by kind."""
|
|
where = ["source_framework = :fw"]
|
|
params: dict = {"fw": "BSI QUAIDAL", "limit": limit, "offset": offset}
|
|
if kind:
|
|
where.append("kind = :kind")
|
|
params["kind"] = kind
|
|
|
|
sql = (
|
|
f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls "
|
|
f"WHERE {' AND '.join(where)} "
|
|
"ORDER BY source_section LIMIT :limit OFFSET :offset"
|
|
)
|
|
count_sql = f"SELECT COUNT(*) FROM compliance.derived_controls WHERE {' AND '.join(where)}"
|
|
|
|
with SessionLocal() as db:
|
|
rows = db.execute(text(sql), params).all()
|
|
total = db.execute(text(count_sql), {k: v for k, v in params.items() if k not in ("limit", "offset")}).scalar() or 0
|
|
return ControlsListResponse(total=int(total), controls=[_row_to_control(r) for r in rows])
|
|
|
|
|
|
@router.get("/controls/{derived_id}", response_model=DerivedControl)
|
|
def get_control(derived_id: str) -> DerivedControl:
|
|
with SessionLocal() as db:
|
|
row = db.execute(text(
|
|
f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls WHERE derived_id = :id"
|
|
), {"id": derived_id}).first()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail=f"Control {derived_id} not found")
|
|
return _row_to_control(row)
|
|
|
|
|
|
@router.get("/criteria", response_model=list[DerivedControl])
|
|
def list_criteria() -> list[DerivedControl]:
|
|
"""Returns the 10 QKB criteria. Use /criteria/{section_id} for the full child tree."""
|
|
with SessionLocal() as db:
|
|
rows = db.execute(text(
|
|
f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls "
|
|
"WHERE source_framework = :fw AND kind = 'criterion' ORDER BY source_section"
|
|
), {"fw": "BSI QUAIDAL"}).all()
|
|
return [_row_to_control(r) for r in rows]
|
|
|
|
|
|
@router.get("/criteria/{section_id}", response_model=CriterionWithChildren)
|
|
def get_criterion_tree(section_id: str) -> CriterionWithChildren:
|
|
"""Single QKB with the building blocks it references and the measures/metrics those reference.
|
|
|
|
`section_id` is the canonical QUAIDAL ID, e.g. `QKB-01`.
|
|
"""
|
|
section_id_upper = section_id.upper()
|
|
with SessionLocal() as db:
|
|
criterion_row = db.execute(text(
|
|
f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls "
|
|
"WHERE source_framework = :fw AND source_section = :sid AND kind = 'criterion'"
|
|
), {"fw": "BSI QUAIDAL", "sid": section_id_upper}).first()
|
|
if not criterion_row:
|
|
raise HTTPException(status_code=404, detail=f"Criterion {section_id_upper} not found")
|
|
|
|
building_block_ids = criterion_row.related_quaidal_ids or []
|
|
building_blocks = []
|
|
if building_block_ids:
|
|
qb_rows = db.execute(text(
|
|
f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls "
|
|
"WHERE source_framework = :fw AND kind = 'building_block' "
|
|
"AND source_section = ANY(:ids) ORDER BY source_section"
|
|
), {"fw": "BSI QUAIDAL", "ids": building_block_ids}).all()
|
|
building_blocks = [_row_to_control(r) for r in qb_rows]
|
|
|
|
# Collect measure IDs from each building block, then fetch them
|
|
measure_ids: list[str] = []
|
|
for qb in building_blocks:
|
|
measure_ids.extend(mid for mid in qb.related_quaidal_ids if mid.startswith("MA-"))
|
|
measures = []
|
|
if measure_ids:
|
|
ma_rows = db.execute(text(
|
|
f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls "
|
|
"WHERE source_framework = :fw AND kind = 'measure' "
|
|
"AND source_section = ANY(:ids) ORDER BY source_section"
|
|
), {"fw": "BSI QUAIDAL", "ids": list(set(measure_ids))}).all()
|
|
measures = [_row_to_control(r) for r in ma_rows]
|
|
|
|
# Collect metric IDs from each measure
|
|
metric_ids: list[str] = []
|
|
for ma in measures:
|
|
metric_ids.extend(mid for mid in ma.related_quaidal_ids if mid.startswith("QM-"))
|
|
metrics = []
|
|
if metric_ids:
|
|
qm_rows = db.execute(text(
|
|
f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls "
|
|
"WHERE source_framework = :fw AND kind = 'metric' "
|
|
"AND source_section = ANY(:ids) ORDER BY source_section"
|
|
), {"fw": "BSI QUAIDAL", "ids": list(set(metric_ids))}).all()
|
|
metrics = [_row_to_control(r) for r in qm_rows]
|
|
|
|
return CriterionWithChildren(
|
|
criterion=_row_to_control(criterion_row),
|
|
building_blocks=building_blocks,
|
|
measures=measures,
|
|
metrics=metrics,
|
|
)
|