feat: control_parent_links population + traceability API + frontend

- _write_atomic_control() now uses RETURNING id and inserts into
  control_parent_links (M:N) with source_regulation, source_article,
  and obligation_candidate_id parsed from parent's source_citation
- New _parse_citation() helper for JSONB source_citation extraction
- New GET /controls/{id}/traceability endpoint returning full chain:
  parent links with obligations, child controls, source_count
- Backend: control_type filter (atomic/rich) for controls + count
- Frontend: Rechtsgrundlagen section in ControlDetail showing all
  parent links per source regulation with obligation text + strength
- Frontend: Atomic/Rich filter dropdown in Control Library list
- Frontend: GenerationStrategyBadge recognizes 'pass0b' strategy
- Tests: 3 new tests for parent_link creation + citation parsing,
  existing batch test mock updated for RETURNING clause

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-23 08:14:29 +01:00
parent 0027f78fc5
commit ac6134ce6d
7 changed files with 511 additions and 43 deletions

View File

@@ -10,6 +10,7 @@ Endpoints:
GET /v1/canonical/frameworks/{framework_id}/controls — Controls of a framework
GET /v1/canonical/controls — All controls (filterable)
GET /v1/canonical/controls/{control_id} — Single control
GET /v1/canonical/controls/{control_id}/traceability — Traceability chain
GET /v1/canonical/controls/{control_id}/similar — Find similar controls
POST /v1/canonical/controls — Create a control
PUT /v1/canonical/controls/{control_id} — Update a control
@@ -314,6 +315,7 @@ async def list_controls(
target_audience: Optional[str] = Query(None),
source: Optional[str] = Query(None, description="Filter by source_citation->source"),
search: Optional[str] = Query(None, description="Full-text search in control_id, title, objective"),
control_type: Optional[str] = Query(None, description="Filter: atomic, rich, or all"),
sort: Optional[str] = Query("control_id", description="Sort field: control_id, created_at, severity"),
order: Optional[str] = Query("asc", description="Sort order: asc or desc"),
limit: Optional[int] = Query(None, ge=1, le=5000, description="Max results"),
@@ -351,6 +353,10 @@ async def list_controls(
else:
query += " AND source_citation->>'source' = :src"
params["src"] = source
if control_type == "atomic":
query += " AND decomposition_method = 'pass0b'"
elif control_type == "rich":
query += " AND (decomposition_method IS NULL OR decomposition_method != 'pass0b')"
if search:
query += " AND (control_id ILIKE :q OR title ILIKE :q OR objective ILIKE :q)"
params["q"] = f"%{search}%"
@@ -391,6 +397,7 @@ async def count_controls(
target_audience: Optional[str] = Query(None),
source: Optional[str] = Query(None),
search: Optional[str] = Query(None),
control_type: Optional[str] = Query(None),
):
"""Count controls matching filters (for pagination)."""
query = "SELECT count(*) FROM canonical_controls WHERE 1=1"
@@ -420,6 +427,10 @@ async def count_controls(
else:
query += " AND source_citation->>'source' = :src"
params["src"] = source
if control_type == "atomic":
query += " AND decomposition_method = 'pass0b'"
elif control_type == "rich":
query += " AND (decomposition_method IS NULL OR decomposition_method != 'pass0b')"
if search:
query += " AND (control_id ILIKE :q OR title ILIKE :q OR objective ILIKE :q)"
params["q"] = f"%{search}%"
@@ -481,6 +492,134 @@ async def get_control(control_id: str):
return _control_row(row)
@router.get("/controls/{control_id}/traceability")
async def get_control_traceability(control_id: str):
"""Get the full traceability chain for a control.
For atomic controls: shows all parent links with source regulations,
articles, and the obligation chain.
For rich controls: shows child atomic controls derived from them.
"""
with SessionLocal() as db:
# Get control UUID
ctrl = db.execute(
text("""
SELECT id, control_id, title, parent_control_uuid,
decomposition_method, source_citation
FROM canonical_controls WHERE control_id = :cid
"""),
{"cid": control_id.upper()},
).fetchone()
if not ctrl:
raise HTTPException(status_code=404, detail="Control not found")
result: dict[str, Any] = {
"control_id": ctrl.control_id,
"title": ctrl.title,
"is_atomic": ctrl.decomposition_method == "pass0b",
}
ctrl_uuid = str(ctrl.id)
# Parent links (M:N) — for atomic controls
parent_links = db.execute(
text("""
SELECT cpl.parent_control_uuid, cpl.link_type,
cpl.confidence, cpl.source_regulation,
cpl.source_article, cpl.obligation_candidate_id,
cc.control_id AS parent_control_id,
cc.title AS parent_title,
cc.source_citation AS parent_citation,
oc.obligation_text, oc.action, oc.object,
oc.normative_strength
FROM control_parent_links cpl
JOIN canonical_controls cc ON cc.id = cpl.parent_control_uuid
LEFT JOIN obligation_candidates oc ON oc.id = cpl.obligation_candidate_id
WHERE cpl.control_uuid = CAST(:uid AS uuid)
ORDER BY cpl.source_regulation, cpl.source_article
"""),
{"uid": ctrl_uuid},
).fetchall()
result["parent_links"] = [
{
"parent_control_id": pl.parent_control_id,
"parent_title": pl.parent_title,
"link_type": pl.link_type,
"confidence": float(pl.confidence) if pl.confidence else 1.0,
"source_regulation": pl.source_regulation,
"source_article": pl.source_article,
"parent_citation": pl.parent_citation,
"obligation": {
"text": pl.obligation_text,
"action": pl.action,
"object": pl.object,
"normative_strength": pl.normative_strength,
} if pl.obligation_text else None,
}
for pl in parent_links
]
# Also include the 1:1 parent (backwards compat) if not already in links
if ctrl.parent_control_uuid:
parent_uuids_in_links = {
str(pl.parent_control_uuid) for pl in parent_links
}
parent_uuid_str = str(ctrl.parent_control_uuid)
if parent_uuid_str not in parent_uuids_in_links:
legacy = db.execute(
text("""
SELECT control_id, title, source_citation
FROM canonical_controls WHERE id = CAST(:uid AS uuid)
"""),
{"uid": parent_uuid_str},
).fetchone()
if legacy:
result["parent_links"].insert(0, {
"parent_control_id": legacy.control_id,
"parent_title": legacy.title,
"link_type": "decomposition",
"confidence": 1.0,
"source_regulation": None,
"source_article": None,
"parent_citation": legacy.source_citation,
"obligation": None,
})
# Child controls — for rich controls
children = db.execute(
text("""
SELECT control_id, title, category, severity,
decomposition_method
FROM canonical_controls
WHERE parent_control_uuid = CAST(:uid AS uuid)
ORDER BY control_id
"""),
{"uid": ctrl_uuid},
).fetchall()
result["children"] = [
{
"control_id": ch.control_id,
"title": ch.title,
"category": ch.category,
"severity": ch.severity,
"decomposition_method": ch.decomposition_method,
}
for ch in children
]
# Unique source regulations count
regs = set()
for pl in result["parent_links"]:
if pl.get("source_regulation"):
regs.add(pl["source_regulation"])
result["source_count"] = len(regs)
return result
# =============================================================================
# CONTROL CRUD (CREATE / UPDATE / DELETE)
# =============================================================================

View File

@@ -955,6 +955,12 @@ class DecompositionPass:
logger.info("Pass 0a: %s", stats)
return stats
_NORMATIVE_STRENGTH_MAP = {
"muss": "must", "must": "must",
"soll": "should", "should": "should",
"kann": "may", "may": "may",
}
def _process_pass0a_obligations(
self,
raw_obligations: list[dict],
@@ -964,6 +970,10 @@ class DecompositionPass:
) -> None:
"""Validate and write obligation candidates from LLM output."""
for idx, raw in enumerate(raw_obligations):
raw_strength = raw.get("normative_strength", "must").lower().strip()
normative_strength = self._NORMATIVE_STRENGTH_MAP.get(
raw_strength, "must"
)
cand = ObligationCandidate(
candidate_id=f"OC-{control_id}-{idx + 1:02d}",
parent_control_uuid=control_uuid,
@@ -971,7 +981,7 @@ class DecompositionPass:
action=raw.get("action", ""),
object_=raw.get("object", ""),
condition=raw.get("condition"),
normative_strength=raw.get("normative_strength", "must"),
normative_strength=normative_strength,
is_test_obligation=bool(raw.get("is_test_obligation", False)),
is_reporting_obligation=bool(raw.get("is_reporting_obligation", False)),
)
@@ -1246,9 +1256,7 @@ class DecompositionPass:
seq = self._next_atomic_seq(obl["parent_control_id"])
atomic.candidate_id = f"{obl['parent_control_id']}-A{seq:02d}"
self._write_atomic_control(
atomic, obl["parent_uuid"], obl["candidate_id"]
)
new_uuid = self._write_atomic_control(atomic, obl)
self.db.execute(
text("""
@@ -1260,7 +1268,7 @@ class DecompositionPass:
)
# Index in Qdrant for future dedup checks
if self._dedup:
if self._dedup and new_uuid:
pattern_id_val = None
pid_row2 = self.db.execute(text(
"SELECT pattern_id FROM canonical_controls WHERE id = CAST(:uid AS uuid)"
@@ -1268,13 +1276,9 @@ class DecompositionPass:
if pid_row2:
pattern_id_val = pid_row2[0]
# Get the UUID of the newly inserted control
new_row = self.db.execute(text(
"SELECT id::text FROM canonical_controls WHERE control_id = :cid ORDER BY created_at DESC LIMIT 1"
), {"cid": atomic.candidate_id}).fetchone()
if new_row and pattern_id_val:
if pattern_id_val:
await self._dedup.index_control(
control_uuid=new_row[0],
control_uuid=new_uuid,
control_id=atomic.candidate_id,
title=atomic.title,
action=obl.get("action", ""),
@@ -1505,43 +1509,88 @@ class DecompositionPass:
)
def _write_atomic_control(
self, atomic: AtomicControlCandidate,
parent_uuid: str, candidate_id: str,
) -> None:
"""Insert an atomic control into canonical_controls."""
self.db.execute(
self, atomic: AtomicControlCandidate, obl: dict,
) -> Optional[str]:
"""Insert an atomic control and create parent link.
Returns the UUID of the newly created control, or None on failure.
"""
parent_uuid = obl["parent_uuid"]
candidate_id = obl["candidate_id"]
result = self.db.execute(
text("""
INSERT INTO canonical_controls (
control_id, title, objective, requirements,
test_procedure, evidence, severity, category,
control_id, title, objective, rationale,
scope, requirements,
test_procedure, evidence, severity,
open_anchors, category,
release_state, parent_control_uuid,
decomposition_method,
generation_metadata
generation_metadata,
framework_id,
generation_strategy, pipeline_version
) VALUES (
:control_id, :title, :objective,
:requirements, :test_procedure, :evidence,
:severity, :category, 'draft',
:control_id, :title, :objective, :rationale,
:scope, :requirements,
:test_procedure, :evidence,
:severity, :open_anchors, :category,
'draft',
CAST(:parent_uuid AS uuid), 'pass0b',
:gen_meta
:gen_meta,
CAST(:framework_id AS uuid),
'pass0b', 2
)
RETURNING id::text
"""),
{
"control_id": atomic.candidate_id,
"title": atomic.title,
"objective": atomic.objective,
"rationale": getattr(atomic, "rationale", None) or "Aus Obligation abgeleitet.",
"scope": json.dumps({}),
"requirements": json.dumps(atomic.requirements),
"test_procedure": json.dumps(atomic.test_procedure),
"evidence": json.dumps(atomic.evidence),
"severity": atomic.severity,
"open_anchors": json.dumps([]),
"category": atomic.category,
"parent_uuid": parent_uuid,
"gen_meta": json.dumps({
"decomposition_source": candidate_id,
"decomposition_method": "pass0b",
}),
"framework_id": "14b1bdd2-abc7-4a43-adae-14471ee5c7cf",
},
)
row = result.fetchone()
new_uuid = row[0] if row else None
# Create M:N parent link (control_parent_links)
if new_uuid:
citation = _parse_citation(obl.get("parent_citation", ""))
self.db.execute(
text("""
INSERT INTO control_parent_links
(control_uuid, parent_control_uuid, link_type, confidence,
source_regulation, source_article, obligation_candidate_id)
VALUES
(CAST(:cu AS uuid), CAST(:pu AS uuid), 'decomposition', 1.0,
:sr, :sa, CAST(:oci AS uuid))
ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING
"""),
{
"cu": new_uuid,
"pu": parent_uuid,
"sr": citation.get("source", ""),
"sa": citation.get("article", ""),
"oci": obl["oc_id"],
},
)
return new_uuid
def _next_atomic_seq(self, parent_control_id: str) -> int:
"""Get the next sequence number for atomic controls under a parent."""
result = self.db.execute(
@@ -2004,6 +2053,22 @@ def _format_citation(citation) -> str:
return str(citation)
def _parse_citation(citation) -> dict:
"""Parse source_citation JSONB into a dict with source/article/paragraph."""
if not citation:
return {}
if isinstance(citation, dict):
return citation
if isinstance(citation, str):
try:
c = json.loads(citation)
if isinstance(c, dict):
return c
except (json.JSONDecodeError, TypeError):
pass
return {}
def _compute_extraction_confidence(flags: dict) -> float:
"""Compute confidence score from quality flags."""
score = 0.0