feat: evidence_type Feld (code/process/hybrid) fuer Controls
All checks were successful
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 38s
CI/CD / test-python-backend-compliance (push) Successful in 31s
CI/CD / test-python-document-crawler (push) Successful in 19s
CI/CD / test-python-dsms-gateway (push) Successful in 17s
CI/CD / validate-canonical-controls (push) Successful in 10s
CI/CD / Deploy (push) Successful in 4s

Neues Feld auf canonical_controls klassifiziert, ob ein Control
technisch im Source Code (code), organisatorisch via Dokumente (process)
oder beides (hybrid) nachgewiesen wird. Inklusive Backfill-Endpoint,
Frontend-Badge/Filter und MkDocs-Dokumentation.

- Migration 079: evidence_type VARCHAR(20) + Index
- Backend: Filter, Backfill-Endpoint mit Domain-Heuristik, CRUD
- Frontend: EvidenceTypeBadge (sky/amber/violet), Nachweisart-Dropdown
- Proxy: evidence_type Passthrough fuer controls + controls-count
- Tests: 22 Tests fuer Klassifikations-Heuristik
- Docs: Eigenes MkDocs-Kapitel mit Mermaid-Diagramm

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-25 21:53:40 +01:00
parent a29bfdd588
commit 5e9cab6ab5
9 changed files with 390 additions and 11 deletions

View File

@@ -80,6 +80,7 @@ class ControlResponse(BaseModel):
customer_visible: Optional[bool] = None
verification_method: Optional[str] = None
category: Optional[str] = None
evidence_type: Optional[str] = None
target_audience: Optional[str] = None
generation_metadata: Optional[dict] = None
generation_strategy: Optional[str] = "ungrouped"
@@ -113,6 +114,7 @@ class ControlCreateRequest(BaseModel):
customer_visible: Optional[bool] = True
verification_method: Optional[str] = None
category: Optional[str] = None
evidence_type: Optional[str] = None
target_audience: Optional[str] = None
generation_metadata: Optional[dict] = None
applicable_industries: Optional[list] = None
@@ -141,6 +143,7 @@ class ControlUpdateRequest(BaseModel):
customer_visible: Optional[bool] = None
verification_method: Optional[str] = None
category: Optional[str] = None
evidence_type: Optional[str] = None
target_audience: Optional[str] = None
generation_metadata: Optional[dict] = None
applicable_industries: Optional[list] = None
@@ -172,7 +175,7 @@ _CONTROL_COLS = """id, framework_id, control_id, title, objective, rationale,
severity, risk_score, implementation_effort,
evidence_confidence, open_anchors, release_state, tags,
license_rule, source_original_text, source_citation,
customer_visible, verification_method, category,
customer_visible, verification_method, category, evidence_type,
target_audience, generation_metadata, generation_strategy,
applicable_industries, applicable_company_size, scope_conditions,
parent_control_uuid, decomposition_method, pipeline_version,
@@ -312,6 +315,7 @@ async def list_controls(
release_state: Optional[str] = Query(None),
verification_method: Optional[str] = Query(None),
category: Optional[str] = Query(None),
evidence_type: Optional[str] = Query(None, description="Filter: code, process, hybrid"),
target_audience: Optional[str] = Query(None),
source: Optional[str] = Query(None, description="Filter by source_citation->source"),
search: Optional[str] = Query(None, description="Full-text search in control_id, title, objective"),
@@ -348,6 +352,9 @@ async def list_controls(
if category:
query += " AND category = :cat"
params["cat"] = category
if evidence_type:
query += " AND evidence_type = :et"
params["et"] = evidence_type
if target_audience:
query += " AND target_audience LIKE :ta_pattern"
params["ta_pattern"] = f'%"{target_audience}"%'
@@ -398,6 +405,7 @@ async def count_controls(
release_state: Optional[str] = Query(None),
verification_method: Optional[str] = Query(None),
category: Optional[str] = Query(None),
evidence_type: Optional[str] = Query(None),
target_audience: Optional[str] = Query(None),
source: Optional[str] = Query(None),
search: Optional[str] = Query(None),
@@ -426,6 +434,9 @@ async def count_controls(
if category:
query += " AND category = :cat"
params["cat"] = category
if evidence_type:
query += " AND evidence_type = :et"
params["et"] = evidence_type
if target_audience:
query += " AND target_audience LIKE :ta_pattern"
params["ta_pattern"] = f'%"{target_audience}"%'
@@ -998,6 +1009,109 @@ async def backfill_normative_strength(
}
# =============================================================================
# EVIDENCE TYPE BACKFILL
# =============================================================================
# Domains that are primarily technical (code-verifiable)
_CODE_DOMAINS = frozenset({
"SEC", "AUTH", "CRYPT", "CRYP", "CRY", "NET", "LOG", "ACC", "APP", "SYS",
"CI", "CONT", "API", "CLOUD", "IAC", "SAST", "DAST", "DEP", "SBOM",
"WEB", "DEV", "SDL", "PKI", "HSM", "TEE", "TPM", "CRX", "CRF",
"FWU", "STO", "RUN", "VUL", "MAL", "PLT", "AUT",
})
# Domains that are primarily process-based (document-verifiable)
_PROCESS_DOMAINS = frozenset({
"GOV", "ORG", "COMP", "LEGAL", "HR", "TRAIN", "AML", "FIN",
"RISK", "AUDIT", "AUD", "PROC", "DOC", "PHYS", "PHY", "PRIV", "DPO",
"BCDR", "BCP", "VENDOR", "SUPPLY", "SUP", "CERT", "POLICY",
"ENV", "HLT", "TRD", "LAB", "PER", "REL", "ISM", "COM",
"GAM", "RIS", "PCA", "GNT", "HCA", "RES", "ISS",
})
# Domains that are typically hybrid
_HYBRID_DOMAINS = frozenset({
"DATA", "AI", "INC", "ID", "IAM", "IDF", "IDP", "IDA", "IDN",
"OPS", "MNT", "INT", "BCK",
})
def _classify_evidence_type(control_id: str, category: str | None) -> str:
"""Heuristic: classify a control as code/process/hybrid based on domain prefix."""
domain = control_id.split("-")[0].upper() if control_id else ""
if domain in _CODE_DOMAINS:
return "code"
if domain in _PROCESS_DOMAINS:
return "process"
if domain in _HYBRID_DOMAINS:
return "hybrid"
# Fallback: use category if available
code_categories = {"encryption", "authentication", "network", "application", "system", "identity"}
process_categories = {"compliance", "personnel", "physical", "governance", "risk"}
if category in code_categories:
return "code"
if category in process_categories:
return "process"
return "process" # Conservative default
@router.post("/controls/backfill-evidence-type")
async def backfill_evidence_type(
dry_run: bool = Query(True, description="Nur zaehlen, nicht aendern"),
):
"""
Klassifiziert Controls als code/process/hybrid basierend auf Domain-Prefix.
Heuristik:
- SEC, AUTH, CRYPT, NET, LOG, ... → code
- GOV, ORG, COMP, LEGAL, HR, ... → process
- DATA, AI, INC → hybrid
"""
with SessionLocal() as db:
rows = db.execute(text("""
SELECT id, control_id, category, evidence_type
FROM canonical_controls
WHERE release_state NOT IN ('rejected', 'merged')
ORDER BY control_id
""")).fetchall()
changes = []
stats = {"total": len(rows), "already_set": 0, "code": 0, "process": 0, "hybrid": 0}
for row in rows:
if row.evidence_type is not None:
stats["already_set"] += 1
continue
new_type = _classify_evidence_type(row.control_id, row.category)
stats[new_type] += 1
changes.append({
"id": str(row.id),
"control_id": row.control_id,
"evidence_type": new_type,
})
if not dry_run and changes:
for change in changes:
db.execute(text("""
UPDATE canonical_controls
SET evidence_type = :et
WHERE id = CAST(:cid AS uuid)
"""), {"et": change["evidence_type"], "cid": change["id"]})
db.commit()
return {
"dry_run": dry_run,
"stats": stats,
"total_changes": len(changes),
"sample_changes": changes[:20],
}
# =============================================================================
# CONTROL CRUD (CREATE / UPDATE / DELETE)
# =============================================================================
@@ -1040,7 +1154,7 @@ async def create_control(body: ControlCreateRequest):
severity, risk_score, implementation_effort, evidence_confidence,
open_anchors, release_state, tags,
license_rule, source_original_text, source_citation,
customer_visible, verification_method, category,
customer_visible, verification_method, category, evidence_type,
target_audience, generation_metadata,
applicable_industries, applicable_company_size, scope_conditions
) VALUES (
@@ -1051,7 +1165,7 @@ async def create_control(body: ControlCreateRequest):
CAST(:anchors AS jsonb), :release_state, CAST(:tags AS jsonb),
:license_rule, :source_original_text,
CAST(:source_citation AS jsonb),
:customer_visible, :verification_method, :category,
:customer_visible, :verification_method, :category, :evidence_type,
:target_audience, CAST(:generation_metadata AS jsonb),
CAST(:applicable_industries AS jsonb),
CAST(:applicable_company_size AS jsonb),
@@ -1082,6 +1196,7 @@ async def create_control(body: ControlCreateRequest):
"customer_visible": body.customer_visible,
"verification_method": body.verification_method,
"category": body.category,
"evidence_type": body.evidence_type,
"target_audience": body.target_audience,
"generation_metadata": _json.dumps(body.generation_metadata) if body.generation_metadata else None,
"applicable_industries": _json.dumps(body.applicable_industries) if body.applicable_industries else None,
@@ -1312,6 +1427,7 @@ def _control_row(r) -> dict:
"customer_visible": r.customer_visible,
"verification_method": r.verification_method,
"category": r.category,
"evidence_type": getattr(r, "evidence_type", None),
"target_audience": r.target_audience,
"generation_metadata": r.generation_metadata,
"generation_strategy": getattr(r, "generation_strategy", "ungrouped"),

View File

@@ -0,0 +1,16 @@
-- Migration 079: Add evidence_type to canonical_controls
-- Classifies HOW a control is evidenced:
-- code = Technical control, verifiable in source code / IaC / CI-CD
-- process = Organizational / governance control, verified via documents / policies
-- hybrid = Both code and process evidence required
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.tables
WHERE table_schema = 'compliance' AND table_name = 'canonical_controls') THEN
ALTER TABLE canonical_controls ADD COLUMN IF NOT EXISTS
evidence_type VARCHAR(20) DEFAULT NULL
CHECK (evidence_type IN ('code', 'process', 'hybrid'));
CREATE INDEX IF NOT EXISTS idx_cc_evidence_type ON canonical_controls(evidence_type);
END IF;
END $$;

View File

@@ -0,0 +1,79 @@
"""Tests for evidence_type classification heuristic."""
import sys
sys.path.insert(0, ".")
from compliance.api.canonical_control_routes import _classify_evidence_type
class TestClassifyEvidenceType:
"""Tests for _classify_evidence_type()."""
# --- Code domains ---
def test_sec_is_code(self):
assert _classify_evidence_type("SEC-042", None) == "code"
def test_auth_is_code(self):
assert _classify_evidence_type("AUTH-001", None) == "code"
def test_crypt_is_code(self):
assert _classify_evidence_type("CRYPT-003", None) == "code"
def test_cryp_is_code(self):
assert _classify_evidence_type("CRYP-010", None) == "code"
def test_net_is_code(self):
assert _classify_evidence_type("NET-015", None) == "code"
def test_log_is_code(self):
assert _classify_evidence_type("LOG-007", None) == "code"
def test_acc_is_code(self):
assert _classify_evidence_type("ACC-012", None) == "code"
def test_api_is_code(self):
assert _classify_evidence_type("API-001", None) == "code"
# --- Process domains ---
def test_gov_is_process(self):
assert _classify_evidence_type("GOV-001", None) == "process"
def test_comp_is_process(self):
assert _classify_evidence_type("COMP-001", None) == "process"
def test_fin_is_process(self):
assert _classify_evidence_type("FIN-001", None) == "process"
def test_hr_is_process(self):
assert _classify_evidence_type("HR-001", None) == "process"
def test_org_is_process(self):
assert _classify_evidence_type("ORG-001", None) == "process"
def test_env_is_process(self):
assert _classify_evidence_type("ENV-001", None) == "process"
# --- Hybrid domains ---
def test_data_is_hybrid(self):
assert _classify_evidence_type("DATA-005", None) == "hybrid"
def test_ai_is_hybrid(self):
assert _classify_evidence_type("AI-001", None) == "hybrid"
def test_inc_is_hybrid(self):
assert _classify_evidence_type("INC-003", None) == "hybrid"
def test_iam_is_hybrid(self):
assert _classify_evidence_type("IAM-001", None) == "hybrid"
# --- Category fallback ---
def test_unknown_domain_encryption_category(self):
assert _classify_evidence_type("XYZ-001", "encryption") == "code"
def test_unknown_domain_governance_category(self):
assert _classify_evidence_type("XYZ-001", "governance") == "process"
def test_unknown_domain_no_category(self):
assert _classify_evidence_type("XYZ-001", None) == "process"
def test_empty_control_id(self):
assert _classify_evidence_type("", None) == "process"