feat: add verification method, categories, and dedup UI to control library
All checks were successful
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 44s
CI/CD / test-python-backend-compliance (push) Successful in 40s
CI/CD / test-python-document-crawler (push) Successful in 22s
CI/CD / test-python-dsms-gateway (push) Successful in 17s
CI/CD / validate-canonical-controls (push) Successful in 10s
CI/CD / Deploy (push) Successful in 4s

- Migration 047: verification_method + category columns, 17 category lookup table
- Backend: new filters, GET /categories, GET /controls/{id}/similar (embedding-based)
- Frontend: filter dropdowns, badges, dedup UI in ControlDetail with merge workflow
- ControlForm: verification method + category selects
- Provenance: verification methods, categories, master library strategy sections
- Fix UUID cast syntax in generator routes (::uuid -> CAST)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-14 07:55:22 +01:00
parent 8a05fcc2f0
commit b6e6ffaaee
10 changed files with 577 additions and 44 deletions

View File

@@ -10,9 +10,11 @@ Endpoints:
GET /v1/canonical/frameworks/{framework_id}/controls — Controls of a framework
GET /v1/canonical/controls — All controls (filterable)
GET /v1/canonical/controls/{control_id} — Single control
GET /v1/canonical/controls/{control_id}/similar — Find similar controls
POST /v1/canonical/controls — Create a control
PUT /v1/canonical/controls/{control_id} — Update a control
DELETE /v1/canonical/controls/{control_id} — Delete a control
GET /v1/canonical/categories — Category list
GET /v1/canonical/sources — Source registry
GET /v1/canonical/licenses — License matrix
POST /v1/canonical/controls/{control_id}/similarity-check — Too-close check
@@ -70,6 +72,13 @@ class ControlResponse(BaseModel):
open_anchors: list
release_state: str
tags: list
license_rule: Optional[int] = None
source_original_text: Optional[str] = None
source_citation: Optional[dict] = None
customer_visible: Optional[bool] = None
verification_method: Optional[str] = None
category: Optional[str] = None
generation_metadata: Optional[dict] = None
created_at: str
updated_at: str
@@ -91,6 +100,13 @@ class ControlCreateRequest(BaseModel):
open_anchors: list = []
release_state: str = "draft"
tags: list = []
license_rule: Optional[int] = None
source_original_text: Optional[str] = None
source_citation: Optional[dict] = None
customer_visible: Optional[bool] = True
verification_method: Optional[str] = None
category: Optional[str] = None
generation_metadata: Optional[dict] = None
class ControlUpdateRequest(BaseModel):
@@ -108,6 +124,13 @@ class ControlUpdateRequest(BaseModel):
open_anchors: Optional[list] = None
release_state: Optional[str] = None
tags: Optional[list] = None
license_rule: Optional[int] = None
source_original_text: Optional[str] = None
source_citation: Optional[dict] = None
customer_visible: Optional[bool] = None
verification_method: Optional[str] = None
category: Optional[str] = None
generation_metadata: Optional[dict] = None
class SimilarityCheckRequest(BaseModel):
@@ -129,6 +152,16 @@ class SimilarityCheckResponse(BaseModel):
# HELPERS
# =============================================================================
_CONTROL_COLS = """id, framework_id, control_id, title, objective, rationale,
scope, requirements, test_procedure, evidence,
severity, risk_score, implementation_effort,
evidence_confidence, open_anchors, release_state, tags,
license_rule, source_original_text, source_citation,
customer_visible, verification_method, category,
generation_metadata,
created_at, updated_at"""
def _row_to_dict(row, columns: list[str]) -> dict[str, Any]:
"""Generic row → dict converter."""
return {col: (getattr(row, col).isoformat() if hasattr(getattr(row, col, None), 'isoformat') else getattr(row, col)) for col in columns}
@@ -206,6 +239,8 @@ async def list_framework_controls(
framework_id: str,
severity: Optional[str] = Query(None),
release_state: Optional[str] = Query(None),
verification_method: Optional[str] = Query(None),
category: Optional[str] = Query(None),
):
"""List controls belonging to a framework."""
with SessionLocal() as db:
@@ -217,12 +252,8 @@ async def list_framework_controls(
if not fw:
raise HTTPException(status_code=404, detail="Framework not found")
query = """
SELECT id, framework_id, control_id, title, objective, rationale,
scope, requirements, test_procedure, evidence,
severity, risk_score, implementation_effort,
evidence_confidence, open_anchors, release_state, tags,
created_at, updated_at
query = f"""
SELECT {_CONTROL_COLS}
FROM canonical_controls
WHERE framework_id = :fw_id
"""
@@ -234,6 +265,12 @@ async def list_framework_controls(
if release_state:
query += " AND release_state = :rs"
params["rs"] = release_state
if verification_method:
query += " AND verification_method = :vm"
params["vm"] = verification_method
if category:
query += " AND category = :cat"
params["cat"] = category
query += " ORDER BY control_id"
rows = db.execute(text(query), params).fetchall()
@@ -250,14 +287,12 @@ async def list_controls(
severity: Optional[str] = Query(None),
domain: Optional[str] = Query(None),
release_state: Optional[str] = Query(None),
verification_method: Optional[str] = Query(None),
category: Optional[str] = Query(None),
):
"""List all canonical controls, with optional filters."""
query = """
SELECT id, framework_id, control_id, title, objective, rationale,
scope, requirements, test_procedure, evidence,
severity, risk_score, implementation_effort,
evidence_confidence, open_anchors, release_state, tags,
created_at, updated_at
query = f"""
SELECT {_CONTROL_COLS}
FROM canonical_controls
WHERE 1=1
"""
@@ -272,6 +307,12 @@ async def list_controls(
if release_state:
query += " AND release_state = :rs"
params["rs"] = release_state
if verification_method:
query += " AND verification_method = :vm"
params["vm"] = verification_method
if category:
query += " AND category = :cat"
params["cat"] = category
query += " ORDER BY control_id"
@@ -286,12 +327,8 @@ async def get_control(control_id: str):
"""Get a single canonical control by its control_id (e.g. AUTH-001)."""
with SessionLocal() as db:
row = db.execute(
text("""
SELECT id, framework_id, control_id, title, objective, rationale,
scope, requirements, test_procedure, evidence,
severity, risk_score, implementation_effort,
evidence_confidence, open_anchors, release_state, tags,
created_at, updated_at
text(f"""
SELECT {_CONTROL_COLS}
FROM canonical_controls
WHERE control_id = :cid
"""),
@@ -339,23 +376,27 @@ async def create_control(body: ControlCreateRequest):
raise HTTPException(status_code=409, detail=f"Control '{body.control_id}' already exists")
row = db.execute(
text("""
text(f"""
INSERT INTO canonical_controls (
framework_id, control_id, title, objective, rationale,
scope, requirements, test_procedure, evidence,
severity, risk_score, implementation_effort, evidence_confidence,
open_anchors, release_state, tags
open_anchors, release_state, tags,
license_rule, source_original_text, source_citation,
customer_visible, verification_method, category,
generation_metadata
) VALUES (
:fw_id, :cid, :title, :objective, :rationale,
:scope::jsonb, :requirements::jsonb, :test_procedure::jsonb, :evidence::jsonb,
CAST(:scope AS jsonb), CAST(:requirements AS jsonb),
CAST(:test_procedure AS jsonb), CAST(:evidence AS jsonb),
:severity, :risk_score, :effort, :confidence,
:anchors::jsonb, :release_state, :tags::jsonb
CAST(:anchors AS jsonb), :release_state, CAST(:tags AS jsonb),
:license_rule, :source_original_text,
CAST(:source_citation AS jsonb),
:customer_visible, :verification_method, :category,
CAST(:generation_metadata AS jsonb)
)
RETURNING id, framework_id, control_id, title, objective, rationale,
scope, requirements, test_procedure, evidence,
severity, risk_score, implementation_effort,
evidence_confidence, open_anchors, release_state, tags,
created_at, updated_at
RETURNING {_CONTROL_COLS}
"""),
{
"fw_id": str(fw.id),
@@ -374,6 +415,13 @@ async def create_control(body: ControlCreateRequest):
"anchors": _json.dumps(body.open_anchors),
"release_state": body.release_state,
"tags": _json.dumps(body.tags),
"license_rule": body.license_rule,
"source_original_text": body.source_original_text,
"source_citation": _json.dumps(body.source_citation) if body.source_citation else None,
"customer_visible": body.customer_visible,
"verification_method": body.verification_method,
"category": body.category,
"generation_metadata": _json.dumps(body.generation_metadata) if body.generation_metadata else None,
},
).fetchone()
db.commit()
@@ -398,13 +446,13 @@ async def update_control(control_id: str, body: ControlUpdateRequest):
# Build dynamic SET clause
set_parts = []
params: dict[str, Any] = {"cid": control_id.upper()}
json_fields = {"scope", "requirements", "test_procedure", "evidence", "open_anchors", "tags"}
json_fields = {"scope", "requirements", "test_procedure", "evidence", "open_anchors", "tags",
"source_citation", "generation_metadata"}
for key, val in updates.items():
col = "implementation_effort" if key == "implementation_effort" else key
col = "evidence_confidence" if key == "evidence_confidence" else col
col = key
if key in json_fields:
set_parts.append(f"{col} = :{key}::jsonb")
set_parts.append(f"{col} = CAST(:{key} AS jsonb)")
params[key] = _json.dumps(val)
else:
set_parts.append(f"{col} = :{key}")
@@ -418,11 +466,7 @@ async def update_control(control_id: str, body: ControlUpdateRequest):
UPDATE canonical_controls
SET {', '.join(set_parts)}
WHERE control_id = :cid
RETURNING id, framework_id, control_id, title, objective, rationale,
scope, requirements, test_procedure, evidence,
severity, risk_score, implementation_effort,
evidence_confidence, open_anchors, release_state, tags,
created_at, updated_at
RETURNING {_CONTROL_COLS}
"""),
params,
).fetchone()
@@ -468,6 +512,94 @@ async def similarity_check(control_id: str, body: SimilarityCheckRequest):
}
# =============================================================================
# CATEGORIES
# =============================================================================
@router.get("/categories")
async def list_categories():
"""List all canonical control categories."""
with SessionLocal() as db:
rows = db.execute(
text("SELECT category_id, label_de, label_en, sort_order FROM canonical_control_categories ORDER BY sort_order")
).fetchall()
return [
{
"category_id": r.category_id,
"label_de": r.label_de,
"label_en": r.label_en,
"sort_order": r.sort_order,
}
for r in rows
]
# =============================================================================
# SIMILAR CONTROLS (Embedding-based dedup)
# =============================================================================
@router.get("/controls/{control_id}/similar")
async def find_similar_controls(
control_id: str,
threshold: float = Query(0.85, ge=0.5, le=1.0),
limit: int = Query(20, ge=1, le=100),
):
"""Find controls similar to the given one using embedding cosine similarity."""
with SessionLocal() as db:
# Get the target control's embedding
target = db.execute(
text("""
SELECT id, control_id, title, objective
FROM canonical_controls
WHERE control_id = :cid
"""),
{"cid": control_id.upper()},
).fetchone()
if not target:
raise HTTPException(status_code=404, detail="Control not found")
# Find similar controls using pg_vector cosine distance if available,
# otherwise fall back to text-based matching via objective similarity
try:
rows = db.execute(
text("""
SELECT c.control_id, c.title, c.severity, c.release_state,
c.tags, c.license_rule, c.verification_method, c.category,
1 - (c.embedding <=> t.embedding) AS similarity
FROM canonical_controls c, canonical_controls t
WHERE t.control_id = :cid
AND c.control_id != :cid
AND c.release_state != 'deprecated'
AND c.embedding IS NOT NULL
AND t.embedding IS NOT NULL
AND 1 - (c.embedding <=> t.embedding) >= :threshold
ORDER BY similarity DESC
LIMIT :lim
"""),
{"cid": control_id.upper(), "threshold": threshold, "lim": limit},
).fetchall()
return [
{
"control_id": r.control_id,
"title": r.title,
"severity": r.severity,
"release_state": r.release_state,
"tags": r.tags or [],
"license_rule": r.license_rule,
"verification_method": r.verification_method,
"category": r.category,
"similarity": round(float(r.similarity), 4),
}
for r in rows
]
except Exception as e:
logger.warning("Embedding similarity query failed (no embedding column?): %s", e)
return []
# =============================================================================
# SOURCES & LICENSES
# =============================================================================
@@ -509,6 +641,13 @@ def _control_row(r) -> dict:
"open_anchors": r.open_anchors,
"release_state": r.release_state,
"tags": r.tags or [],
"license_rule": r.license_rule,
"source_original_text": r.source_original_text,
"source_citation": r.source_citation,
"customer_visible": r.customer_visible,
"verification_method": r.verification_method,
"category": r.category,
"generation_metadata": r.generation_metadata,
"created_at": r.created_at.isoformat() if r.created_at else None,
"updated_at": r.updated_at.isoformat() if r.updated_at else None,
}