dfac940272
Backend
- backend-compliance/compliance/api/licenses_routes.py: three endpoints
built on the now-complete license_rule classification
- GET /api/compliance/licenses/overview
global aggregation by rule + per-source breakdown (Stufe 1)
- POST /api/compliance/licenses/aggregate
per-control-set aggregation for PDF footer (Stufe 2) and
tech-file appendix (Stufe 4) — consumed later
- GET /api/compliance/licenses/source-info/{control_uuid}
single-control lookup for the inline source badge (Stufe 3)
- registered in api/__init__.py via the existing safe-import loader
Frontend
- app/sdk/licenses/page.tsx (Stufe 1): the /sdk/licenses overview page.
Renders rule legend cards + per-rule source tables. Drives the
/licenses footer link and gives auditors a one-page view of what
licence classes the platform is operating under.
- components/sdk/SourceBadge.tsx (Stufe 3): reusable React component.
Small R1/R2/R3 pill with click-expand tooltip showing source
regulation + attribution string + render-full-text policy. Will be
embedded into IACE hazards/mitigations, VVT items, DSFA controls in
follow-up commits.
Two stages of the four-stage renderer are now ready. Stufe 2 (PDF
auto-footer) + Stufe 4 (tech-file appendix) follow once the existing
PDF generators are extended to call /licenses/aggregate.
307 lines
11 KiB
Python
307 lines
11 KiB
Python
"""License attribution endpoints — Task #23 Stufe 1-4.
|
|
|
|
The audit (Task #22) classified all 314,811 canonical_controls into
|
|
license_rule 1/2/3. The frontend, PDF renderer, and tech-file generator
|
|
now need to surface that classification in the form of:
|
|
|
|
- Stufe 1: a global /licenses overview page
|
|
- Stufe 2: an auto-footer in every exported PDF
|
|
- Stufe 3: an inline source badge on every rendered hazard/measure
|
|
- Stufe 4: a sources appendix in tech-file bundles
|
|
|
|
This module exposes three endpoints that all four stages consume:
|
|
|
|
GET /api/compliance/licenses/overview
|
|
Global aggregation by rule + per-source counts. Drives Stufe 1.
|
|
|
|
POST /api/compliance/licenses/aggregate
|
|
Body: {"control_uuids": ["uuid1", ...]}.
|
|
Returns per-rule grouping with source breakdown. Used by PDF
|
|
footer (Stufe 2) and tech-file appendix (Stufe 4) to build the
|
|
"sources used in this document" list.
|
|
|
|
GET /api/compliance/licenses/source-info/{control_uuid}
|
|
Single-control lookup for the inline source badge tooltip
|
|
(Stufe 3). Returns rule, source regulation, attribution text.
|
|
|
|
Why a new module instead of extending canonical_control_routes:
|
|
- canonical_control_routes serves the legacy SPDX-style license matrix
|
|
(canonical_control_licenses + canonical_control_sources, ~10 rows).
|
|
- This module is built on regulation_registry (252 rows) + the
|
|
license_rule on each control. Both schemas coexist; this module
|
|
doesn't disturb the legacy endpoints.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import Any, Optional
|
|
from uuid import UUID
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import text
|
|
from sqlalchemy.orm import Session
|
|
|
|
from classroom_engine.database import get_db
|
|
|
|
router = APIRouter(prefix="/licenses", tags=["licenses"])
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ============================================================================
|
|
# Rule labels — used by frontend renderer
|
|
# ============================================================================
|
|
|
|
RULE_LABELS = {
|
|
1: {
|
|
"code": "R1",
|
|
"label_de": "Wörtlich übernehmbar",
|
|
"label_en": "Verbatim, no attribution required",
|
|
"render_full_text": True,
|
|
"attribution_required": False,
|
|
},
|
|
2: {
|
|
"code": "R2",
|
|
"label_de": "Wörtlich mit Attribution",
|
|
"label_en": "Verbatim with attribution",
|
|
"render_full_text": True,
|
|
"attribution_required": True,
|
|
},
|
|
3: {
|
|
"code": "R3",
|
|
"label_de": "Nur Identifier zitieren",
|
|
"label_en": "Identifier citation only",
|
|
"render_full_text": False,
|
|
"attribution_required": False,
|
|
},
|
|
}
|
|
|
|
|
|
# ============================================================================
|
|
# Response Schemas
|
|
# ============================================================================
|
|
|
|
|
|
class SourceCount(BaseModel):
|
|
regulation_id: str
|
|
regulation_name_de: Optional[str]
|
|
license_rule: int
|
|
license_type: Optional[str]
|
|
attribution: Optional[str]
|
|
jurisdiction: Optional[str]
|
|
source_type: Optional[str]
|
|
n_controls: int
|
|
|
|
|
|
class RuleBucket(BaseModel):
|
|
rule: int
|
|
label_de: str
|
|
label_en: str
|
|
attribution_required: bool
|
|
render_full_text: bool
|
|
total_controls: int
|
|
distinct_sources: int
|
|
sources: list[SourceCount]
|
|
|
|
|
|
class OverviewResponse(BaseModel):
|
|
total_controls: int
|
|
buckets: list[RuleBucket]
|
|
|
|
|
|
class AggregateRequest(BaseModel):
|
|
control_uuids: list[UUID]
|
|
|
|
|
|
class AggregateResponse(BaseModel):
|
|
total_in_request: int
|
|
matched: int
|
|
buckets: list[RuleBucket]
|
|
|
|
|
|
class SourceInfo(BaseModel):
|
|
control_uuid: UUID
|
|
license_rule: Optional[int]
|
|
license_label_de: Optional[str]
|
|
attribution_required: bool
|
|
render_full_text: bool
|
|
regulation_id: Optional[str]
|
|
regulation_name_de: Optional[str]
|
|
license_type: Optional[str]
|
|
attribution: Optional[str]
|
|
source_url: Optional[str]
|
|
|
|
|
|
# ============================================================================
|
|
# Endpoints
|
|
# ============================================================================
|
|
|
|
|
|
def _bucket(rule: int, sources: list[SourceCount]) -> RuleBucket:
|
|
meta = RULE_LABELS.get(rule, RULE_LABELS[3])
|
|
return RuleBucket(
|
|
rule=rule,
|
|
label_de=meta["label_de"],
|
|
label_en=meta["label_en"],
|
|
attribution_required=meta["attribution_required"],
|
|
render_full_text=meta["render_full_text"],
|
|
total_controls=sum(s.n_controls for s in sources),
|
|
distinct_sources=len(sources),
|
|
sources=sources,
|
|
)
|
|
|
|
|
|
@router.get("/overview", response_model=OverviewResponse)
|
|
def licenses_overview(db: Session = Depends(get_db)) -> OverviewResponse:
|
|
"""Global aggregation: total controls by rule, with per-source breakdown.
|
|
|
|
Drives Stufe 1 (the /licenses page).
|
|
"""
|
|
rows = db.execute(text("""
|
|
SELECT
|
|
COALESCE(cpl.source_regulation, '(no source)') AS regulation_name,
|
|
cc.license_rule,
|
|
COUNT(DISTINCT cc.id) AS n
|
|
FROM compliance.canonical_controls cc
|
|
LEFT JOIN compliance.control_parent_links cpl ON cpl.control_uuid = cc.id
|
|
WHERE cc.license_rule IS NOT NULL
|
|
GROUP BY 1, 2
|
|
""")).fetchall()
|
|
|
|
reg_rows = db.execute(text("""
|
|
SELECT regulation_name_de, regulation_id, license_type, attribution,
|
|
jurisdiction, source_type
|
|
FROM compliance.regulation_registry
|
|
""")).fetchall()
|
|
reg_by_name = {r.regulation_name_de: r for r in reg_rows if r.regulation_name_de}
|
|
|
|
by_rule: dict[int, list[SourceCount]] = {1: [], 2: [], 3: []}
|
|
seen: dict[tuple[int, str], int] = {}
|
|
total = 0
|
|
for row in rows:
|
|
rule = int(row.license_rule)
|
|
name = row.regulation_name
|
|
n = int(row.n)
|
|
key = (rule, name)
|
|
# multiple cpl entries per control deduplicate via DISTINCT, but a
|
|
# control with several source_regulations still gets counted once
|
|
# per regulation — that's the design.
|
|
seen[key] = seen.get(key, 0) + n
|
|
total += n
|
|
|
|
for (rule, name), n in seen.items():
|
|
reg = reg_by_name.get(name)
|
|
by_rule.setdefault(rule, []).append(SourceCount(
|
|
regulation_id=reg.regulation_id if reg else name,
|
|
regulation_name_de=name,
|
|
license_rule=rule,
|
|
license_type=reg.license_type if reg else None,
|
|
attribution=reg.attribution if reg else None,
|
|
jurisdiction=reg.jurisdiction if reg else None,
|
|
source_type=reg.source_type if reg else None,
|
|
n_controls=n,
|
|
))
|
|
|
|
for r in by_rule.values():
|
|
r.sort(key=lambda s: -s.n_controls)
|
|
buckets = [_bucket(rule, sources) for rule, sources in sorted(by_rule.items())]
|
|
return OverviewResponse(total_controls=total, buckets=buckets)
|
|
|
|
|
|
@router.post("/aggregate", response_model=AggregateResponse)
|
|
def aggregate_for_controls(
|
|
body: AggregateRequest,
|
|
db: Session = Depends(get_db),
|
|
) -> AggregateResponse:
|
|
"""Per-control license aggregation for PDF footer (Stufe 2) and
|
|
tech-file sources appendix (Stufe 4).
|
|
|
|
Returns a per-rule breakdown of which sources contributed to the
|
|
supplied control set. The frontend renderer turns this into the
|
|
"Verwendete Quellen" footer.
|
|
"""
|
|
if not body.control_uuids:
|
|
return AggregateResponse(total_in_request=0, matched=0, buckets=[])
|
|
|
|
rows = db.execute(text("""
|
|
SELECT
|
|
COALESCE(cpl.source_regulation, '(unknown)') AS regulation_name,
|
|
cc.license_rule,
|
|
COUNT(DISTINCT cc.id) AS n
|
|
FROM compliance.canonical_controls cc
|
|
LEFT JOIN compliance.control_parent_links cpl ON cpl.control_uuid = cc.id
|
|
WHERE cc.id = ANY(:ids) AND cc.license_rule IS NOT NULL
|
|
GROUP BY 1, 2
|
|
"""), {"ids": [str(u) for u in body.control_uuids]}).fetchall()
|
|
|
|
reg_rows = db.execute(text("""
|
|
SELECT regulation_name_de, regulation_id, license_type, attribution,
|
|
jurisdiction, source_type
|
|
FROM compliance.regulation_registry
|
|
""")).fetchall()
|
|
reg_by_name = {r.regulation_name_de: r for r in reg_rows if r.regulation_name_de}
|
|
|
|
by_rule: dict[int, list[SourceCount]] = {1: [], 2: [], 3: []}
|
|
matched_total = 0
|
|
for row in rows:
|
|
rule = int(row.license_rule)
|
|
n = int(row.n)
|
|
matched_total += n
|
|
reg = reg_by_name.get(row.regulation_name)
|
|
by_rule.setdefault(rule, []).append(SourceCount(
|
|
regulation_id=reg.regulation_id if reg else row.regulation_name,
|
|
regulation_name_de=row.regulation_name,
|
|
license_rule=rule,
|
|
license_type=reg.license_type if reg else None,
|
|
attribution=reg.attribution if reg else None,
|
|
jurisdiction=reg.jurisdiction if reg else None,
|
|
source_type=reg.source_type if reg else None,
|
|
n_controls=n,
|
|
))
|
|
for r in by_rule.values():
|
|
r.sort(key=lambda s: -s.n_controls)
|
|
buckets = [_bucket(rule, sources) for rule, sources in sorted(by_rule.items()) if sources]
|
|
return AggregateResponse(
|
|
total_in_request=len(body.control_uuids),
|
|
matched=matched_total,
|
|
buckets=buckets,
|
|
)
|
|
|
|
|
|
@router.get("/source-info/{control_uuid}", response_model=SourceInfo)
|
|
def source_info_for_control(
|
|
control_uuid: UUID,
|
|
db: Session = Depends(get_db),
|
|
) -> SourceInfo:
|
|
"""Single-control source info for the inline source badge (Stufe 3).
|
|
|
|
Used by the React `<SourceBadge>` component to populate its tooltip.
|
|
"""
|
|
row = db.execute(text("""
|
|
SELECT cc.license_rule, cpl.source_regulation AS regulation_name,
|
|
r.regulation_id, r.license_type, r.attribution, r.url AS source_url
|
|
FROM compliance.canonical_controls cc
|
|
LEFT JOIN compliance.control_parent_links cpl ON cpl.control_uuid = cc.id
|
|
LEFT JOIN compliance.regulation_registry r ON r.regulation_name_de = cpl.source_regulation
|
|
WHERE cc.id = :uuid
|
|
LIMIT 1
|
|
"""), {"uuid": str(control_uuid)}).fetchone()
|
|
if row is None:
|
|
raise HTTPException(status_code=404, detail="control not found")
|
|
|
|
rule = int(row.license_rule) if row.license_rule is not None else None
|
|
meta = RULE_LABELS.get(rule, {}) if rule else {}
|
|
return SourceInfo(
|
|
control_uuid=control_uuid,
|
|
license_rule=rule,
|
|
license_label_de=meta.get("label_de"),
|
|
attribution_required=meta.get("attribution_required", False),
|
|
render_full_text=meta.get("render_full_text", False),
|
|
regulation_id=row.regulation_id,
|
|
regulation_name_de=row.regulation_name,
|
|
license_type=row.license_type,
|
|
attribution=row.attribution,
|
|
source_url=row.source_url,
|
|
)
|