feat: Control-Detail Provenance + Atomare Controls Seite
All checks were successful
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 41s
CI/CD / test-python-backend-compliance (push) Successful in 40s
CI/CD / test-python-document-crawler (push) Successful in 23s
CI/CD / test-python-dsms-gateway (push) Successful in 18s
CI/CD / validate-canonical-controls (push) Successful in 11s
CI/CD / Deploy (push) Successful in 4s

Backend: provenance endpoint (obligations, doc refs, merged duplicates,
regulations summary) + atomic-stats aggregation endpoint.
Frontend: ControlDetail mit Provenance-Sektionen, klickbare Navigation,
neue /sdk/atomic-controls Seite mit Stats-Bar und gefilterer Liste.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-24 10:38:34 +01:00
parent 200facda6a
commit 6d3bdf8e74
8 changed files with 1210 additions and 5 deletions

View File

@@ -473,6 +473,61 @@ async def controls_meta():
}
@router.get("/controls/atomic-stats")
async def atomic_stats():
"""Return aggregated statistics for atomic controls (masters only)."""
with SessionLocal() as db:
total_active = db.execute(text("""
SELECT count(*) FROM canonical_controls
WHERE decomposition_method = 'pass0b'
AND release_state NOT IN ('duplicate', 'deprecated', 'rejected')
""")).scalar() or 0
total_duplicate = db.execute(text("""
SELECT count(*) FROM canonical_controls
WHERE decomposition_method = 'pass0b'
AND release_state = 'duplicate'
""")).scalar() or 0
by_domain = db.execute(text("""
SELECT UPPER(SPLIT_PART(control_id, '-', 1)) AS domain, count(*) AS cnt
FROM canonical_controls
WHERE decomposition_method = 'pass0b'
AND release_state NOT IN ('duplicate', 'deprecated', 'rejected')
GROUP BY domain ORDER BY cnt DESC
""")).fetchall()
by_regulation = db.execute(text("""
SELECT cpl.source_regulation AS regulation, count(DISTINCT cc.id) AS cnt
FROM canonical_controls cc
JOIN control_parent_links cpl ON cpl.control_uuid = cc.id
WHERE cc.decomposition_method = 'pass0b'
AND cc.release_state NOT IN ('duplicate', 'deprecated', 'rejected')
AND cpl.source_regulation IS NOT NULL
GROUP BY cpl.source_regulation ORDER BY cnt DESC
""")).fetchall()
avg_coverage = db.execute(text("""
SELECT COALESCE(AVG(reg_count), 0)
FROM (
SELECT cc.id, count(DISTINCT cpl.source_regulation) AS reg_count
FROM canonical_controls cc
LEFT JOIN control_parent_links cpl ON cpl.control_uuid = cc.id
WHERE cc.decomposition_method = 'pass0b'
AND cc.release_state NOT IN ('duplicate', 'deprecated', 'rejected')
GROUP BY cc.id
) sub
""")).scalar() or 0
return {
"total_active": total_active,
"total_duplicate": total_duplicate,
"by_domain": [{"domain": r[0], "count": r[1]} for r in by_domain],
"by_regulation": [{"regulation": r[0], "count": r[1]} for r in by_regulation],
"avg_regulation_coverage": round(float(avg_coverage), 1),
}
@router.get("/controls/{control_id}")
async def get_control(control_id: str):
"""Get a single canonical control by its control_id (e.g. AUTH-001)."""
@@ -620,6 +675,239 @@ async def get_control_traceability(control_id: str):
return result
@router.get("/controls/{control_id}/provenance")
async def get_control_provenance(control_id: str):
"""Get full provenance chain for a control — extends traceability with
obligations, document references, merged duplicates, and regulations summary.
"""
with SessionLocal() as db:
ctrl = db.execute(
text("""
SELECT id, control_id, title, parent_control_uuid,
decomposition_method, source_citation
FROM canonical_controls WHERE control_id = :cid
"""),
{"cid": control_id.upper()},
).fetchone()
if not ctrl:
raise HTTPException(status_code=404, detail="Control not found")
ctrl_uuid = str(ctrl.id)
is_atomic = ctrl.decomposition_method == "pass0b"
result: dict[str, Any] = {
"control_id": ctrl.control_id,
"title": ctrl.title,
"is_atomic": is_atomic,
}
# --- Parent links (same as traceability) ---
parent_links = db.execute(
text("""
SELECT cpl.parent_control_uuid, cpl.link_type,
cpl.confidence, cpl.source_regulation,
cpl.source_article, cpl.obligation_candidate_id,
cc.control_id AS parent_control_id,
cc.title AS parent_title,
cc.source_citation AS parent_citation,
oc.obligation_text, oc.action, oc.object,
oc.normative_strength
FROM control_parent_links cpl
JOIN canonical_controls cc ON cc.id = cpl.parent_control_uuid
LEFT JOIN obligation_candidates oc ON oc.id = cpl.obligation_candidate_id
WHERE cpl.control_uuid = CAST(:uid AS uuid)
ORDER BY cpl.source_regulation, cpl.source_article
"""),
{"uid": ctrl_uuid},
).fetchall()
result["parent_links"] = [
{
"parent_control_id": pl.parent_control_id,
"parent_title": pl.parent_title,
"link_type": pl.link_type,
"confidence": float(pl.confidence) if pl.confidence else 1.0,
"source_regulation": pl.source_regulation,
"source_article": pl.source_article,
"parent_citation": pl.parent_citation,
"obligation": {
"text": pl.obligation_text,
"action": pl.action,
"object": pl.object,
"normative_strength": pl.normative_strength,
} if pl.obligation_text else None,
}
for pl in parent_links
]
# Legacy 1:1 parent (backwards compat)
if ctrl.parent_control_uuid:
parent_uuids_in_links = {
str(pl.parent_control_uuid) for pl in parent_links
}
parent_uuid_str = str(ctrl.parent_control_uuid)
if parent_uuid_str not in parent_uuids_in_links:
legacy = db.execute(
text("""
SELECT control_id, title, source_citation
FROM canonical_controls WHERE id = CAST(:uid AS uuid)
"""),
{"uid": parent_uuid_str},
).fetchone()
if legacy:
result["parent_links"].insert(0, {
"parent_control_id": legacy.control_id,
"parent_title": legacy.title,
"link_type": "decomposition",
"confidence": 1.0,
"source_regulation": None,
"source_article": None,
"parent_citation": legacy.source_citation,
"obligation": None,
})
# --- Children ---
children = db.execute(
text("""
SELECT control_id, title, category, severity,
decomposition_method
FROM canonical_controls
WHERE parent_control_uuid = CAST(:uid AS uuid)
ORDER BY control_id
"""),
{"uid": ctrl_uuid},
).fetchall()
result["children"] = [
{
"control_id": ch.control_id,
"title": ch.title,
"category": ch.category,
"severity": ch.severity,
"decomposition_method": ch.decomposition_method,
}
for ch in children
]
# Source count
regs = set()
for pl in result["parent_links"]:
if pl.get("source_regulation"):
regs.add(pl["source_regulation"])
result["source_count"] = len(regs)
# --- Obligations (for Rich Controls) ---
obligations = db.execute(
text("""
SELECT candidate_id, obligation_text, action, object,
normative_strength, release_state
FROM obligation_candidates
WHERE parent_control_uuid = CAST(:uid AS uuid)
AND release_state NOT IN ('rejected', 'merged')
ORDER BY candidate_id
"""),
{"uid": ctrl_uuid},
).fetchall()
result["obligations"] = [
{
"candidate_id": ob.candidate_id,
"obligation_text": ob.obligation_text,
"action": ob.action,
"object": ob.object,
"normative_strength": ob.normative_strength,
"release_state": ob.release_state,
}
for ob in obligations
]
result["obligation_count"] = len(obligations)
# --- Document References ---
doc_refs = db.execute(
text("""
SELECT DISTINCT oe.regulation_code, oe.article, oe.paragraph,
oe.extraction_method, oe.confidence
FROM obligation_extractions oe
WHERE oe.control_uuid = CAST(:uid AS uuid)
OR oe.obligation_id IN (
SELECT oc.candidate_id FROM obligation_candidates oc
JOIN control_parent_links cpl ON cpl.obligation_candidate_id = oc.id
WHERE cpl.control_uuid = CAST(:uid AS uuid)
)
ORDER BY oe.regulation_code, oe.article
"""),
{"uid": ctrl_uuid},
).fetchall()
result["document_references"] = [
{
"regulation_code": dr.regulation_code,
"article": dr.article,
"paragraph": dr.paragraph,
"extraction_method": dr.extraction_method,
"confidence": float(dr.confidence) if dr.confidence else None,
}
for dr in doc_refs
]
# --- Merged Duplicates ---
merged = db.execute(
text("""
SELECT cc.control_id, cc.title,
(SELECT cpl.source_regulation FROM control_parent_links cpl
WHERE cpl.control_uuid = cc.id LIMIT 1) AS source_regulation
FROM canonical_controls cc
WHERE cc.merged_into_uuid = CAST(:uid AS uuid)
AND cc.release_state = 'duplicate'
ORDER BY cc.control_id
"""),
{"uid": ctrl_uuid},
).fetchall()
result["merged_duplicates"] = [
{
"control_id": m.control_id,
"title": m.title,
"source_regulation": m.source_regulation,
}
for m in merged
]
result["merged_duplicates_count"] = len(merged)
# --- Regulations Summary (aggregated from parent_links + doc_refs) ---
reg_map: dict[str, dict[str, Any]] = {}
for pl in result["parent_links"]:
reg = pl.get("source_regulation")
if not reg:
continue
if reg not in reg_map:
reg_map[reg] = {"articles": set(), "link_types": set()}
if pl.get("source_article"):
reg_map[reg]["articles"].add(pl["source_article"])
reg_map[reg]["link_types"].add(pl.get("link_type", "decomposition"))
for dr in result["document_references"]:
reg = dr.get("regulation_code")
if not reg:
continue
if reg not in reg_map:
reg_map[reg] = {"articles": set(), "link_types": set()}
if dr.get("article"):
reg_map[reg]["articles"].add(dr["article"])
result["regulations_summary"] = [
{
"regulation_code": reg,
"articles": sorted(info["articles"]),
"link_types": sorted(info["link_types"]),
}
for reg, info in sorted(reg_map.items())
]
return result
# =============================================================================
# CONTROL CRUD (CREATE / UPDATE / DELETE)
# =============================================================================

View File

@@ -0,0 +1,277 @@
"""Tests for provenance and atomic-stats endpoints.
Covers:
- GET /v1/canonical/controls/{control_id}/provenance
- GET /v1/canonical/controls/atomic-stats
"""
import pytest
from unittest.mock import MagicMock, patch
from datetime import datetime
from compliance.api.canonical_control_routes import (
get_control_provenance,
atomic_stats,
)
# =============================================================================
# HELPERS
# =============================================================================
def _mock_row(**kwargs):
"""Create a mock DB row with attribute access."""
obj = MagicMock()
for k, v in kwargs.items():
setattr(obj, k, v)
return obj
def _mock_db_execute(return_values):
"""Return a mock that cycles through return values for sequential .execute() calls."""
mock_db = MagicMock()
results = iter(return_values)
def execute_side_effect(*args, **kwargs):
result = next(results)
mock_result = MagicMock()
if isinstance(result, list):
mock_result.fetchall.return_value = result
mock_result.fetchone.return_value = result[0] if result else None
elif isinstance(result, int):
mock_result.scalar.return_value = result
elif result is None:
mock_result.fetchone.return_value = None
mock_result.fetchall.return_value = []
mock_result.scalar.return_value = 0
else:
mock_result.fetchone.return_value = result
mock_result.fetchall.return_value = [result]
return mock_result
mock_db.execute.side_effect = execute_side_effect
return mock_db
# =============================================================================
# PROVENANCE ENDPOINT
# =============================================================================
class TestProvenanceEndpoint:
"""Tests for GET /controls/{control_id}/provenance."""
@pytest.mark.asyncio
async def test_provenance_not_found(self):
"""404 when control doesn't exist."""
from fastapi import HTTPException
mock_db = _mock_db_execute([None])
with patch("compliance.api.canonical_control_routes.SessionLocal") as mock_session:
mock_session.return_value.__enter__ = MagicMock(return_value=mock_db)
mock_session.return_value.__exit__ = MagicMock(return_value=False)
with pytest.raises(HTTPException) as exc_info:
await get_control_provenance("NONEXISTENT-999")
assert exc_info.value.status_code == 404
@pytest.mark.asyncio
async def test_provenance_atomic_control(self):
"""Atomic control returns document_references, parent_links, merged_duplicates."""
import uuid
ctrl_id = uuid.uuid4()
ctrl_row = _mock_row(
id=ctrl_id,
control_id="SEC-042",
title="Test Atomic Control",
parent_control_uuid=None,
decomposition_method="pass0b",
source_citation=None,
)
parent_link = _mock_row(
parent_control_uuid=uuid.uuid4(),
parent_control_id="DATA-005",
parent_title="Parent Control",
link_type="decomposition",
confidence=0.95,
source_regulation="DSGVO",
source_article="Art. 32",
parent_citation=None,
obligation_text="Must encrypt",
action="encrypt",
object="personal data",
normative_strength="must",
obligation_candidate_id=None,
)
child_row = _mock_row(
control_id="SEC-042a",
title="Child",
category="encryption",
severity="high",
decomposition_method="pass0b",
)
obligation_row = _mock_row(
candidate_id="OBL-SEC-042-001",
obligation_text="Test obligation",
action="encrypt",
object="data at rest",
normative_strength="must",
release_state="composed",
)
doc_ref = _mock_row(
regulation_code="DSGVO",
article="Art. 32",
paragraph="Abs. 1 lit. a",
extraction_method="llm_extracted",
confidence=0.92,
)
merged = _mock_row(
control_id="SEC-099",
title="Encryption at rest (NIS2)",
source_regulation="NIS2",
)
mock_db = _mock_db_execute([
ctrl_row, # control lookup
[parent_link], # parent_links
[], # children
[obligation_row], # obligations
[doc_ref], # document_references
[merged], # merged_duplicates
])
with patch("compliance.api.canonical_control_routes.SessionLocal") as mock_session:
mock_session.return_value.__enter__ = MagicMock(return_value=mock_db)
mock_session.return_value.__exit__ = MagicMock(return_value=False)
result = await get_control_provenance("SEC-042")
assert result["control_id"] == "SEC-042"
assert result["is_atomic"] is True
assert len(result["parent_links"]) == 1
assert result["parent_links"][0]["parent_control_id"] == "DATA-005"
assert result["obligation_count"] == 1
assert len(result["document_references"]) == 1
assert result["document_references"][0]["regulation_code"] == "DSGVO"
assert len(result["merged_duplicates"]) == 1
assert result["merged_duplicates"][0]["control_id"] == "SEC-099"
@pytest.mark.asyncio
async def test_provenance_rich_control(self):
"""Rich control returns obligations list and children."""
import uuid
ctrl_id = uuid.uuid4()
ctrl_row = _mock_row(
id=ctrl_id,
control_id="DATA-005",
title="Rich Control",
parent_control_uuid=None,
decomposition_method=None,
source_citation={"source": "DSGVO"},
)
obligation_row = _mock_row(
candidate_id="OBL-DATA-005-001",
obligation_text="Encrypt personal data",
action="encrypt",
object="personal data",
normative_strength="must",
release_state="composed",
)
child_row = _mock_row(
control_id="SEC-042",
title="Child Atomic",
category="encryption",
severity="high",
decomposition_method="pass0b",
)
mock_db = _mock_db_execute([
ctrl_row, # control lookup
[], # parent_links
[child_row], # children
[obligation_row], # obligations
[], # document_references
[], # merged_duplicates
])
with patch("compliance.api.canonical_control_routes.SessionLocal") as mock_session:
mock_session.return_value.__enter__ = MagicMock(return_value=mock_db)
mock_session.return_value.__exit__ = MagicMock(return_value=False)
result = await get_control_provenance("DATA-005")
assert result["control_id"] == "DATA-005"
assert result["is_atomic"] is False
assert result["obligation_count"] == 1
assert result["obligations"][0]["candidate_id"] == "OBL-DATA-005-001"
assert len(result["children"]) == 1
assert result["children"][0]["control_id"] == "SEC-042"
# =============================================================================
# ATOMIC STATS ENDPOINT
# =============================================================================
class TestAtomicStatsEndpoint:
"""Tests for GET /controls/atomic-stats."""
@pytest.mark.asyncio
async def test_atomic_stats_response_shape(self):
"""Stats endpoint returns expected aggregation fields."""
mock_db = _mock_db_execute([
18234, # total_active
67000, # total_duplicate
[ # by_domain
_mock_row(**{"__getitem__": lambda s, i: ["SEC", 4200][i]}),
],
[ # by_regulation
_mock_row(**{"__getitem__": lambda s, i: ["DSGVO", 1200][i]}),
],
2.3, # avg_coverage
])
# Override __getitem__ for tuple-like access
domain_row = MagicMock()
domain_row.__getitem__ = lambda s, i: ["SEC", 4200][i]
reg_row = MagicMock()
reg_row.__getitem__ = lambda s, i: ["DSGVO", 1200][i]
mock_db2 = MagicMock()
call_count = [0]
responses = [18234, 67000, [domain_row], [reg_row], 2.3]
def execute_side(*args, **kwargs):
idx = call_count[0]
call_count[0] += 1
r = MagicMock()
val = responses[idx]
if isinstance(val, list):
r.fetchall.return_value = val
else:
r.scalar.return_value = val
return r
mock_db2.execute.side_effect = execute_side
with patch("compliance.api.canonical_control_routes.SessionLocal") as mock_session:
mock_session.return_value.__enter__ = MagicMock(return_value=mock_db2)
mock_session.return_value.__exit__ = MagicMock(return_value=False)
result = await atomic_stats()
assert result["total_active"] == 18234
assert result["total_duplicate"] == 67000
assert len(result["by_domain"]) == 1
assert result["by_domain"][0]["domain"] == "SEC"
assert len(result["by_regulation"]) == 1
assert result["by_regulation"][0]["regulation"] == "DSGVO"
assert result["avg_regulation_coverage"] == 2.3