feat(quaidal): backend API + frontend tab for BSI QUAIDAL data-quality controls
Wire the 195 Clean-Room QUAIDAL controls (from breakpilot-core migration 011)
into the compliance SaaS UI.
Backend:
- GET /api/v1/quaidal/stats - counts by kind + source provenance
- GET /api/v1/quaidal/controls - list, optional kind= filter
- GET /api/v1/quaidal/controls/{id} - single derived control
- GET /api/v1/quaidal/criteria - 10 QKB criteria
- GET /api/v1/quaidal/criteria/{id} - QKB with QB/MA/QM tree
Frontend:
- /sdk/quality: new "Trainingsdaten-Qualität (BSI QUAIDAL)" tab with
10 QKB cards and a drill-down modal showing the full QB→MA→QM tree
plus original BSI source link and license note.
- /sdk/ai-act: Art. 10 tile on each high-risk/unacceptable result,
linking to /sdk/quality?category=data_quality.
Pattern matches existing IACE module DIN-reference handling:
own wording, source section + URL preserved for due diligence.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,244 @@
|
||||
"""FastAPI routes for QUAIDAL-derived Controls (AI Trainingsdaten-Qualität).
|
||||
|
||||
Endpoints:
|
||||
- GET /v1/quaidal/stats - Counts by kind + source provenance
|
||||
- GET /v1/quaidal/controls - List all controls, optional kind= filter
|
||||
- GET /v1/quaidal/controls/{id} - Single derived control by derived_id
|
||||
- GET /v1/quaidal/criteria - The 10 QKB criteria with linked QB/MA IDs
|
||||
- GET /v1/quaidal/criteria/{id} - Single QKB with full child tree (QB → MA → QM)
|
||||
|
||||
The controls are Clean-Room derived from BSI QUAIDAL. See
|
||||
control-pipeline/scripts/derive_quaidal_mcs.py and migration 011.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import APIRouter, HTTPException, Query
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import text
|
||||
|
||||
from database import SessionLocal
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
router = APIRouter(prefix="/v1/quaidal", tags=["quaidal"])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Response shapes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class ExternalRef(BaseModel):
|
||||
framework: str
|
||||
citation: Optional[str] = None
|
||||
|
||||
|
||||
class SourceProvenance(BaseModel):
|
||||
framework: str
|
||||
section: str
|
||||
url: Optional[str] = None
|
||||
commit_sha: Optional[str] = None
|
||||
title_original: Optional[str] = None
|
||||
license_note: Optional[str] = None
|
||||
|
||||
|
||||
class DerivedControl(BaseModel):
|
||||
derived_id: str
|
||||
kind: str
|
||||
canonical_name: str
|
||||
description: str
|
||||
regulation_anchor: Optional[str] = None
|
||||
related_quaidal_ids: list[str]
|
||||
external_refs: list[ExternalRef]
|
||||
source: SourceProvenance
|
||||
plagiarism_score: Optional[float] = None
|
||||
|
||||
|
||||
class ControlsListResponse(BaseModel):
|
||||
total: int
|
||||
controls: list[DerivedControl]
|
||||
|
||||
|
||||
class CriterionWithChildren(BaseModel):
|
||||
"""A QKB criterion with the IDs of its linked building blocks, measures and metrics."""
|
||||
criterion: DerivedControl
|
||||
building_blocks: list[DerivedControl]
|
||||
measures: list[DerivedControl]
|
||||
metrics: list[DerivedControl]
|
||||
|
||||
|
||||
class StatsResponse(BaseModel):
|
||||
counts_by_kind: dict[str, int]
|
||||
source_framework: str
|
||||
source_commit_sha: Optional[str]
|
||||
license_note: Optional[str]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# DB helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _row_to_control(row) -> DerivedControl:
|
||||
return DerivedControl(
|
||||
derived_id=row.derived_id,
|
||||
kind=row.kind,
|
||||
canonical_name=row.canonical_name,
|
||||
description=row.description,
|
||||
regulation_anchor=row.regulation_anchor,
|
||||
related_quaidal_ids=row.related_quaidal_ids or [],
|
||||
external_refs=[ExternalRef(**r) for r in (row.external_refs or [])],
|
||||
source=SourceProvenance(
|
||||
framework=row.source_framework,
|
||||
section=row.source_section,
|
||||
url=row.source_url,
|
||||
commit_sha=row.source_commit_sha,
|
||||
title_original=row.source_title_original,
|
||||
license_note=row.source_license_note,
|
||||
),
|
||||
plagiarism_score=float(row.plagiarism_score_at_generation) if row.plagiarism_score_at_generation is not None else None,
|
||||
)
|
||||
|
||||
|
||||
_SELECT_COLUMNS = """
|
||||
derived_id, kind, canonical_name, description, regulation_anchor,
|
||||
related_quaidal_ids, external_refs,
|
||||
source_framework, source_section, source_url, source_commit_sha,
|
||||
source_title_original, source_license_note,
|
||||
plagiarism_score_at_generation
|
||||
"""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Endpoints
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@router.get("/stats", response_model=StatsResponse)
|
||||
def get_stats() -> StatsResponse:
|
||||
"""Counts by kind + the QUAIDAL source provenance (single source today)."""
|
||||
with SessionLocal() as db:
|
||||
counts = db.execute(text(
|
||||
"SELECT kind, COUNT(*) AS n FROM compliance.derived_controls "
|
||||
"WHERE source_framework = :fw GROUP BY kind"
|
||||
), {"fw": "BSI QUAIDAL"}).all()
|
||||
meta = db.execute(text(
|
||||
"SELECT source_commit_sha, source_license_note FROM compliance.derived_controls "
|
||||
"WHERE source_framework = :fw LIMIT 1"
|
||||
), {"fw": "BSI QUAIDAL"}).first()
|
||||
return StatsResponse(
|
||||
counts_by_kind={r.kind: r.n for r in counts},
|
||||
source_framework="BSI QUAIDAL",
|
||||
source_commit_sha=meta.source_commit_sha if meta else None,
|
||||
license_note=meta.source_license_note if meta else None,
|
||||
)
|
||||
|
||||
|
||||
@router.get("/controls", response_model=ControlsListResponse)
|
||||
def list_controls(
|
||||
kind: Optional[str] = Query(None, description="criterion | building_block | measure | metric"),
|
||||
limit: int = Query(500, ge=1, le=2000),
|
||||
offset: int = Query(0, ge=0),
|
||||
) -> ControlsListResponse:
|
||||
"""List QUAIDAL-derived controls, optionally filtered by kind."""
|
||||
where = ["source_framework = :fw"]
|
||||
params: dict = {"fw": "BSI QUAIDAL", "limit": limit, "offset": offset}
|
||||
if kind:
|
||||
where.append("kind = :kind")
|
||||
params["kind"] = kind
|
||||
|
||||
sql = (
|
||||
f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls "
|
||||
f"WHERE {' AND '.join(where)} "
|
||||
"ORDER BY source_section LIMIT :limit OFFSET :offset"
|
||||
)
|
||||
count_sql = f"SELECT COUNT(*) FROM compliance.derived_controls WHERE {' AND '.join(where)}"
|
||||
|
||||
with SessionLocal() as db:
|
||||
rows = db.execute(text(sql), params).all()
|
||||
total = db.execute(text(count_sql), {k: v for k, v in params.items() if k not in ("limit", "offset")}).scalar() or 0
|
||||
return ControlsListResponse(total=int(total), controls=[_row_to_control(r) for r in rows])
|
||||
|
||||
|
||||
@router.get("/controls/{derived_id}", response_model=DerivedControl)
|
||||
def get_control(derived_id: str) -> DerivedControl:
|
||||
with SessionLocal() as db:
|
||||
row = db.execute(text(
|
||||
f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls WHERE derived_id = :id"
|
||||
), {"id": derived_id}).first()
|
||||
if not row:
|
||||
raise HTTPException(status_code=404, detail=f"Control {derived_id} not found")
|
||||
return _row_to_control(row)
|
||||
|
||||
|
||||
@router.get("/criteria", response_model=list[DerivedControl])
|
||||
def list_criteria() -> list[DerivedControl]:
|
||||
"""Returns the 10 QKB criteria. Use /criteria/{section_id} for the full child tree."""
|
||||
with SessionLocal() as db:
|
||||
rows = db.execute(text(
|
||||
f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls "
|
||||
"WHERE source_framework = :fw AND kind = 'criterion' ORDER BY source_section"
|
||||
), {"fw": "BSI QUAIDAL"}).all()
|
||||
return [_row_to_control(r) for r in rows]
|
||||
|
||||
|
||||
@router.get("/criteria/{section_id}", response_model=CriterionWithChildren)
|
||||
def get_criterion_tree(section_id: str) -> CriterionWithChildren:
|
||||
"""Single QKB with the building blocks it references and the measures/metrics those reference.
|
||||
|
||||
`section_id` is the canonical QUAIDAL ID, e.g. `QKB-01`.
|
||||
"""
|
||||
section_id_upper = section_id.upper()
|
||||
with SessionLocal() as db:
|
||||
criterion_row = db.execute(text(
|
||||
f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls "
|
||||
"WHERE source_framework = :fw AND source_section = :sid AND kind = 'criterion'"
|
||||
), {"fw": "BSI QUAIDAL", "sid": section_id_upper}).first()
|
||||
if not criterion_row:
|
||||
raise HTTPException(status_code=404, detail=f"Criterion {section_id_upper} not found")
|
||||
|
||||
building_block_ids = criterion_row.related_quaidal_ids or []
|
||||
building_blocks = []
|
||||
if building_block_ids:
|
||||
qb_rows = db.execute(text(
|
||||
f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls "
|
||||
"WHERE source_framework = :fw AND kind = 'building_block' "
|
||||
"AND source_section = ANY(:ids) ORDER BY source_section"
|
||||
), {"fw": "BSI QUAIDAL", "ids": building_block_ids}).all()
|
||||
building_blocks = [_row_to_control(r) for r in qb_rows]
|
||||
|
||||
# Collect measure IDs from each building block, then fetch them
|
||||
measure_ids: list[str] = []
|
||||
for qb in building_blocks:
|
||||
measure_ids.extend(mid for mid in qb.related_quaidal_ids if mid.startswith("MA-"))
|
||||
measures = []
|
||||
if measure_ids:
|
||||
ma_rows = db.execute(text(
|
||||
f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls "
|
||||
"WHERE source_framework = :fw AND kind = 'measure' "
|
||||
"AND source_section = ANY(:ids) ORDER BY source_section"
|
||||
), {"fw": "BSI QUAIDAL", "ids": list(set(measure_ids))}).all()
|
||||
measures = [_row_to_control(r) for r in ma_rows]
|
||||
|
||||
# Collect metric IDs from each measure
|
||||
metric_ids: list[str] = []
|
||||
for ma in measures:
|
||||
metric_ids.extend(mid for mid in ma.related_quaidal_ids if mid.startswith("QM-"))
|
||||
metrics = []
|
||||
if metric_ids:
|
||||
qm_rows = db.execute(text(
|
||||
f"SELECT {_SELECT_COLUMNS} FROM compliance.derived_controls "
|
||||
"WHERE source_framework = :fw AND kind = 'metric' "
|
||||
"AND source_section = ANY(:ids) ORDER BY source_section"
|
||||
), {"fw": "BSI QUAIDAL", "ids": list(set(metric_ids))}).all()
|
||||
metrics = [_row_to_control(r) for r in qm_rows]
|
||||
|
||||
return CriterionWithChildren(
|
||||
criterion=_row_to_control(criterion_row),
|
||||
building_blocks=building_blocks,
|
||||
measures=measures,
|
||||
metrics=metrics,
|
||||
)
|
||||
@@ -55,6 +55,7 @@ from compliance.api.saving_scan_routes import router as saving_scan_router
|
||||
from compliance.api.agent_migration_routes import router as agent_migration_router
|
||||
from compliance.api.vendor_assessment_routes import router as vendor_assessment_router
|
||||
from compliance.api.cra_routes import router as cra_router
|
||||
from compliance.api.quaidal_routes import router as quaidal_router
|
||||
|
||||
# Middleware
|
||||
from middleware import (
|
||||
@@ -168,6 +169,7 @@ app.include_router(vendor_assessment_router, prefix="/api")
|
||||
|
||||
# CRA (Cyber Resilience Act) Compliance
|
||||
app.include_router(cra_router, prefix="/api")
|
||||
app.include_router(quaidal_router, prefix="/api")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user