""" FastAPI routes for the Canonical Control Library. Independently authored security controls anchored in open-source frameworks (OWASP, NIST, ENISA). No proprietary nomenclature. Endpoints: GET /v1/canonical/frameworks — All frameworks GET /v1/canonical/frameworks/{framework_id} — Framework details GET /v1/canonical/frameworks/{framework_id}/controls — Controls of a framework GET /v1/canonical/controls — All controls (filterable) GET /v1/canonical/controls/{control_id} — Single control GET /v1/canonical/controls/{control_id}/traceability — Traceability chain GET /v1/canonical/controls/{control_id}/similar — Find similar controls POST /v1/canonical/controls — Create a control PUT /v1/canonical/controls/{control_id} — Update a control DELETE /v1/canonical/controls/{control_id} — Delete a control GET /v1/canonical/categories — Category list GET /v1/canonical/sources — Source registry GET /v1/canonical/licenses — License matrix POST /v1/canonical/controls/{control_id}/similarity-check — Too-close check """ from __future__ import annotations import json import logging from typing import Any, Optional from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel from sqlalchemy import text from database import SessionLocal from compliance.services.license_gate import get_license_matrix, get_source_permissions from compliance.services.similarity_detector import check_similarity logger = logging.getLogger(__name__) router = APIRouter(prefix="/v1/canonical", tags=["canonical-controls"]) # ============================================================================= # RESPONSE MODELS # ============================================================================= class FrameworkResponse(BaseModel): id: str framework_id: str name: str version: str description: Optional[str] = None owner: Optional[str] = None policy_version: Optional[str] = None release_state: str created_at: str updated_at: str class ControlResponse(BaseModel): id: str framework_id: str control_id: str title: str objective: str rationale: str scope: dict requirements: list test_procedure: list evidence: list severity: str risk_score: Optional[float] = None implementation_effort: Optional[str] = None evidence_confidence: Optional[float] = None open_anchors: list release_state: str tags: list license_rule: Optional[int] = None source_original_text: Optional[str] = None source_citation: Optional[dict] = None customer_visible: Optional[bool] = None verification_method: Optional[str] = None category: Optional[str] = None evidence_type: Optional[str] = None target_audience: Optional[str] = None generation_metadata: Optional[dict] = None generation_strategy: Optional[str] = "ungrouped" applicable_industries: Optional[list] = None applicable_company_size: Optional[list] = None scope_conditions: Optional[dict] = None created_at: str updated_at: str class ControlCreateRequest(BaseModel): framework_id: str # e.g. 'bp_security_v1' control_id: str # e.g. 'AUTH-003' title: str objective: str rationale: str scope: dict = {} requirements: list = [] test_procedure: list = [] evidence: list = [] severity: str = "medium" risk_score: Optional[float] = None implementation_effort: Optional[str] = None evidence_confidence: Optional[float] = None open_anchors: list = [] release_state: str = "draft" tags: list = [] license_rule: Optional[int] = None source_original_text: Optional[str] = None source_citation: Optional[dict] = None customer_visible: Optional[bool] = True verification_method: Optional[str] = None category: Optional[str] = None evidence_type: Optional[str] = None target_audience: Optional[str] = None generation_metadata: Optional[dict] = None applicable_industries: Optional[list] = None applicable_company_size: Optional[list] = None scope_conditions: Optional[dict] = None class ControlUpdateRequest(BaseModel): title: Optional[str] = None objective: Optional[str] = None rationale: Optional[str] = None scope: Optional[dict] = None requirements: Optional[list] = None test_procedure: Optional[list] = None evidence: Optional[list] = None severity: Optional[str] = None risk_score: Optional[float] = None implementation_effort: Optional[str] = None evidence_confidence: Optional[float] = None open_anchors: Optional[list] = None release_state: Optional[str] = None tags: Optional[list] = None license_rule: Optional[int] = None source_original_text: Optional[str] = None source_citation: Optional[dict] = None customer_visible: Optional[bool] = None verification_method: Optional[str] = None category: Optional[str] = None evidence_type: Optional[str] = None target_audience: Optional[str] = None generation_metadata: Optional[dict] = None applicable_industries: Optional[list] = None applicable_company_size: Optional[list] = None scope_conditions: Optional[dict] = None class SimilarityCheckRequest(BaseModel): source_text: str candidate_text: str class SimilarityCheckResponse(BaseModel): max_exact_run: int token_overlap: float ngram_jaccard: float embedding_cosine: float lcs_ratio: float status: str details: dict # ============================================================================= # HELPERS # ============================================================================= _CONTROL_COLS = """id, framework_id, control_id, title, objective, rationale, scope, requirements, test_procedure, evidence, severity, risk_score, implementation_effort, evidence_confidence, open_anchors, release_state, tags, license_rule, source_original_text, source_citation, customer_visible, verification_method, category, evidence_type, target_audience, generation_metadata, generation_strategy, applicable_industries, applicable_company_size, scope_conditions, parent_control_uuid, decomposition_method, pipeline_version, (SELECT p.control_id FROM canonical_controls p WHERE p.id = canonical_controls.parent_control_uuid) AS parent_control_id, (SELECT p.title FROM canonical_controls p WHERE p.id = canonical_controls.parent_control_uuid) AS parent_control_title, created_at, updated_at""" def _row_to_dict(row, columns: list[str]) -> dict[str, Any]: """Generic row → dict converter.""" return {col: (getattr(row, col).isoformat() if hasattr(getattr(row, col, None), 'isoformat') else getattr(row, col)) for col in columns} # ============================================================================= # FRAMEWORKS # ============================================================================= @router.get("/frameworks") async def list_frameworks(): """List all registered control frameworks.""" with SessionLocal() as db: rows = db.execute( text(""" SELECT id, framework_id, name, version, description, owner, policy_version, release_state, created_at, updated_at FROM canonical_control_frameworks ORDER BY name """) ).fetchall() return [ { "id": str(r.id), "framework_id": r.framework_id, "name": r.name, "version": r.version, "description": r.description, "owner": r.owner, "policy_version": r.policy_version, "release_state": r.release_state, "created_at": r.created_at.isoformat() if r.created_at else None, "updated_at": r.updated_at.isoformat() if r.updated_at else None, } for r in rows ] @router.get("/frameworks/{framework_id}") async def get_framework(framework_id: str): """Get a single framework by its framework_id.""" with SessionLocal() as db: row = db.execute( text(""" SELECT id, framework_id, name, version, description, owner, policy_version, release_state, created_at, updated_at FROM canonical_control_frameworks WHERE framework_id = :fid """), {"fid": framework_id}, ).fetchone() if not row: raise HTTPException(status_code=404, detail="Framework not found") return { "id": str(row.id), "framework_id": row.framework_id, "name": row.name, "version": row.version, "description": row.description, "owner": row.owner, "policy_version": row.policy_version, "release_state": row.release_state, "created_at": row.created_at.isoformat() if row.created_at else None, "updated_at": row.updated_at.isoformat() if row.updated_at else None, } @router.get("/frameworks/{framework_id}/controls") async def list_framework_controls( framework_id: str, severity: Optional[str] = Query(None), release_state: Optional[str] = Query(None), verification_method: Optional[str] = Query(None), category: Optional[str] = Query(None), target_audience: Optional[str] = Query(None), ): """List controls belonging to a framework.""" with SessionLocal() as db: # Resolve framework UUID fw = db.execute( text("SELECT id FROM canonical_control_frameworks WHERE framework_id = :fid"), {"fid": framework_id}, ).fetchone() if not fw: raise HTTPException(status_code=404, detail="Framework not found") query = f""" SELECT {_CONTROL_COLS} FROM canonical_controls WHERE framework_id = :fw_id """ params: dict[str, Any] = {"fw_id": str(fw.id)} if severity: query += " AND severity = :sev" params["sev"] = severity if release_state: query += " AND release_state = :rs" params["rs"] = release_state if verification_method: query += " AND verification_method = :vm" params["vm"] = verification_method if category: query += " AND category = :cat" params["cat"] = category if target_audience: query += " AND target_audience::jsonb @> (:ta)::jsonb" params["ta"] = json.dumps([target_audience]) query += " ORDER BY control_id" rows = db.execute(text(query), params).fetchall() return [_control_row(r) for r in rows] # ============================================================================= # CONTROLS # ============================================================================= @router.get("/controls") async def list_controls( severity: Optional[str] = Query(None), domain: Optional[str] = Query(None), release_state: Optional[str] = Query(None), verification_method: Optional[str] = Query(None), category: Optional[str] = Query(None), evidence_type: Optional[str] = Query(None, description="Filter: code, process, hybrid"), target_audience: Optional[str] = Query(None), source: Optional[str] = Query(None, description="Filter by source_citation->source"), search: Optional[str] = Query(None, description="Full-text search in control_id, title, objective"), control_type: Optional[str] = Query(None, description="Filter: atomic, rich, or all"), exclude_duplicates: bool = Query(False, description="Exclude controls with release_state='duplicate'"), sort: Optional[str] = Query("control_id", description="Sort field: control_id, created_at, severity"), order: Optional[str] = Query("asc", description="Sort order: asc or desc"), limit: Optional[int] = Query(None, ge=1, le=5000, description="Max results"), offset: Optional[int] = Query(None, ge=0, description="Offset for pagination"), ): """List canonical controls with filters, search, sorting and pagination.""" query = f""" SELECT {_CONTROL_COLS} FROM canonical_controls WHERE 1=1 """ params: dict[str, Any] = {} if exclude_duplicates: query += " AND release_state != 'duplicate'" if severity: query += " AND severity = :sev" params["sev"] = severity if domain: query += " AND LEFT(control_id, LENGTH(:dom)) = :dom" params["dom"] = domain.upper() if release_state: query += " AND release_state = :rs" params["rs"] = release_state if verification_method: query += " AND verification_method = :vm" params["vm"] = verification_method if category: query += " AND category = :cat" params["cat"] = category if evidence_type: query += " AND evidence_type = :et" params["et"] = evidence_type if target_audience: query += " AND target_audience LIKE :ta_pattern" params["ta_pattern"] = f'%"{target_audience}"%' if source: if source == "__none__": query += " AND (source_citation IS NULL OR source_citation->>'source' IS NULL OR source_citation->>'source' = '')" else: query += " AND source_citation->>'source' = :src" params["src"] = source if control_type == "atomic": query += " AND decomposition_method = 'pass0b'" elif control_type == "rich": query += " AND (decomposition_method IS NULL OR decomposition_method != 'pass0b')" elif control_type == "eigenentwicklung": query += """ AND generation_strategy = 'ungrouped' AND (pipeline_version = '1' OR pipeline_version IS NULL) AND source_citation IS NULL AND parent_control_uuid IS NULL""" if search: query += " AND (control_id ILIKE :q OR title ILIKE :q OR objective ILIKE :q)" params["q"] = f"%{search}%" # Sorting sort_col = "control_id" if sort in ("created_at", "updated_at", "severity", "control_id"): sort_col = sort elif sort == "source": sort_col = "source_citation->>'source'" sort_dir = "DESC" if order and order.lower() == "desc" else "ASC" if sort == "source": # Group by source first, then by control_id within each source query += f" ORDER BY {sort_col} {sort_dir} NULLS LAST, control_id ASC" else: query += f" ORDER BY {sort_col} {sort_dir}" if limit is not None: query += " LIMIT :lim" params["lim"] = limit if offset is not None: query += " OFFSET :off" params["off"] = offset with SessionLocal() as db: rows = db.execute(text(query), params).fetchall() return [_control_row(r) for r in rows] @router.get("/controls-count") async def count_controls( severity: Optional[str] = Query(None), domain: Optional[str] = Query(None), release_state: Optional[str] = Query(None), verification_method: Optional[str] = Query(None), category: Optional[str] = Query(None), evidence_type: Optional[str] = Query(None), target_audience: Optional[str] = Query(None), source: Optional[str] = Query(None), search: Optional[str] = Query(None), control_type: Optional[str] = Query(None), exclude_duplicates: bool = Query(False, description="Exclude controls with release_state='duplicate'"), ): """Count controls matching filters (for pagination).""" query = "SELECT count(*) FROM canonical_controls WHERE 1=1" params: dict[str, Any] = {} if exclude_duplicates: query += " AND release_state != 'duplicate'" if severity: query += " AND severity = :sev" params["sev"] = severity if domain: query += " AND LEFT(control_id, LENGTH(:dom)) = :dom" params["dom"] = domain.upper() if release_state: query += " AND release_state = :rs" params["rs"] = release_state if verification_method: query += " AND verification_method = :vm" params["vm"] = verification_method if category: query += " AND category = :cat" params["cat"] = category if evidence_type: query += " AND evidence_type = :et" params["et"] = evidence_type if target_audience: query += " AND target_audience LIKE :ta_pattern" params["ta_pattern"] = f'%"{target_audience}"%' if source: if source == "__none__": query += " AND (source_citation IS NULL OR source_citation->>'source' IS NULL OR source_citation->>'source' = '')" else: query += " AND source_citation->>'source' = :src" params["src"] = source if control_type == "atomic": query += " AND decomposition_method = 'pass0b'" elif control_type == "rich": query += " AND (decomposition_method IS NULL OR decomposition_method != 'pass0b')" elif control_type == "eigenentwicklung": query += """ AND generation_strategy = 'ungrouped' AND (pipeline_version = '1' OR pipeline_version IS NULL) AND source_citation IS NULL AND parent_control_uuid IS NULL""" if search: query += " AND (control_id ILIKE :q OR title ILIKE :q OR objective ILIKE :q)" params["q"] = f"%{search}%" with SessionLocal() as db: total = db.execute(text(query), params).scalar() return {"total": total} @router.get("/controls-meta") async def controls_meta( severity: Optional[str] = Query(None), domain: Optional[str] = Query(None), release_state: Optional[str] = Query(None), verification_method: Optional[str] = Query(None), category: Optional[str] = Query(None), evidence_type: Optional[str] = Query(None), target_audience: Optional[str] = Query(None), source: Optional[str] = Query(None), search: Optional[str] = Query(None), control_type: Optional[str] = Query(None), exclude_duplicates: bool = Query(False), ): """Return faceted metadata for filter dropdowns. Each facet's counts respect ALL active filters EXCEPT the facet's own, so dropdowns always show how many items each option would yield. """ def _build_where(skip: Optional[str] = None) -> tuple[str, dict[str, Any]]: clauses = ["1=1"] p: dict[str, Any] = {} if exclude_duplicates: clauses.append("release_state != 'duplicate'") if severity and skip != "severity": clauses.append("severity = :sev") p["sev"] = severity if domain and skip != "domain": clauses.append("LEFT(control_id, LENGTH(:dom)) = :dom") p["dom"] = domain.upper() if release_state and skip != "release_state": clauses.append("release_state = :rs") p["rs"] = release_state if verification_method and skip != "verification_method": clauses.append("verification_method = :vm") p["vm"] = verification_method if category and skip != "category": clauses.append("category = :cat") p["cat"] = category if evidence_type and skip != "evidence_type": clauses.append("evidence_type = :et") p["et"] = evidence_type if target_audience and skip != "target_audience": clauses.append("target_audience LIKE :ta_pattern") p["ta_pattern"] = f'%"{target_audience}"%' if source and skip != "source": if source == "__none__": clauses.append("(source_citation IS NULL OR source_citation->>'source' IS NULL OR source_citation->>'source' = '')") else: clauses.append("source_citation->>'source' = :src") p["src"] = source if control_type and skip != "control_type": if control_type == "atomic": clauses.append("decomposition_method = 'pass0b'") elif control_type == "rich": clauses.append("(decomposition_method IS NULL OR decomposition_method != 'pass0b')") elif control_type == "eigenentwicklung": clauses.append("""generation_strategy = 'ungrouped' AND (pipeline_version = '1' OR pipeline_version IS NULL) AND source_citation IS NULL AND parent_control_uuid IS NULL""") if search and skip != "search": clauses.append("(control_id ILIKE :q OR title ILIKE :q OR objective ILIKE :q)") p["q"] = f"%{search}%" return " AND ".join(clauses), p with SessionLocal() as db: # Total with ALL filters w_all, p_all = _build_where() total = db.execute(text(f"SELECT count(*) FROM canonical_controls WHERE {w_all}"), p_all).scalar() # Domain facet (skip domain filter so user sees all domains) w_dom, p_dom = _build_where(skip="domain") domains = db.execute(text(f""" SELECT UPPER(SPLIT_PART(control_id, '-', 1)) as domain, count(*) as cnt FROM canonical_controls WHERE {w_dom} GROUP BY domain ORDER BY domain """), p_dom).fetchall() # Source facet (skip source filter) w_src, p_src = _build_where(skip="source") sources = db.execute(text(f""" SELECT source_citation->>'source' as src, count(*) as cnt FROM canonical_controls WHERE {w_src} AND source_citation->>'source' IS NOT NULL AND source_citation->>'source' != '' GROUP BY src ORDER BY cnt DESC """), p_src).fetchall() no_source = db.execute(text(f""" SELECT count(*) FROM canonical_controls WHERE {w_src} AND (source_citation IS NULL OR source_citation->>'source' IS NULL OR source_citation->>'source' = '') """), p_src).scalar() # Type facet (skip control_type filter) w_typ, p_typ = _build_where(skip="control_type") atomic_count = db.execute(text(f""" SELECT count(*) FROM canonical_controls WHERE {w_typ} AND decomposition_method = 'pass0b' """), p_typ).scalar() or 0 eigenentwicklung_count = db.execute(text(f""" SELECT count(*) FROM canonical_controls WHERE {w_typ} AND generation_strategy = 'ungrouped' AND (pipeline_version = '1' OR pipeline_version IS NULL) AND source_citation IS NULL AND parent_control_uuid IS NULL """), p_typ).scalar() or 0 rich_count = db.execute(text(f""" SELECT count(*) FROM canonical_controls WHERE {w_typ} AND (decomposition_method IS NULL OR decomposition_method != 'pass0b') """), p_typ).scalar() or 0 # Severity facet (skip severity filter) w_sev, p_sev = _build_where(skip="severity") severity_counts = db.execute(text(f""" SELECT severity, count(*) as cnt FROM canonical_controls WHERE {w_sev} GROUP BY severity ORDER BY severity """), p_sev).fetchall() # Verification method facet w_vm, p_vm = _build_where(skip="verification_method") vm_counts = db.execute(text(f""" SELECT verification_method, count(*) as cnt FROM canonical_controls WHERE {w_vm} AND verification_method IS NOT NULL GROUP BY verification_method ORDER BY verification_method """), p_vm).fetchall() # Category facet w_cat, p_cat = _build_where(skip="category") cat_counts = db.execute(text(f""" SELECT category, count(*) as cnt FROM canonical_controls WHERE {w_cat} AND category IS NOT NULL GROUP BY category ORDER BY cnt DESC """), p_cat).fetchall() # Evidence type facet w_et, p_et = _build_where(skip="evidence_type") et_counts = db.execute(text(f""" SELECT evidence_type, count(*) as cnt FROM canonical_controls WHERE {w_et} AND evidence_type IS NOT NULL GROUP BY evidence_type ORDER BY evidence_type """), p_et).fetchall() # Release state facet w_rs, p_rs = _build_where(skip="release_state") rs_counts = db.execute(text(f""" SELECT release_state, count(*) as cnt FROM canonical_controls WHERE {w_rs} GROUP BY release_state ORDER BY release_state """), p_rs).fetchall() return { "total": total, "domains": [{"domain": r[0], "count": r[1]} for r in domains], "sources": [{"source": r[0], "count": r[1]} for r in sources], "no_source_count": no_source, "type_counts": { "rich": rich_count, "atomic": atomic_count, "eigenentwicklung": eigenentwicklung_count, }, "severity_counts": {r[0]: r[1] for r in severity_counts}, "verification_method_counts": {r[0]: r[1] for r in vm_counts}, "category_counts": {r[0]: r[1] for r in cat_counts}, "evidence_type_counts": {r[0]: r[1] for r in et_counts}, "release_state_counts": {r[0]: r[1] for r in rs_counts}, } @router.get("/controls/atomic-stats") async def atomic_stats(): """Return aggregated statistics for atomic controls (masters only).""" with SessionLocal() as db: total_active = db.execute(text(""" SELECT count(*) FROM canonical_controls WHERE decomposition_method = 'pass0b' AND release_state NOT IN ('duplicate', 'deprecated', 'rejected') """)).scalar() or 0 total_duplicate = db.execute(text(""" SELECT count(*) FROM canonical_controls WHERE decomposition_method = 'pass0b' AND release_state = 'duplicate' """)).scalar() or 0 by_domain = db.execute(text(""" SELECT UPPER(SPLIT_PART(control_id, '-', 1)) AS domain, count(*) AS cnt FROM canonical_controls WHERE decomposition_method = 'pass0b' AND release_state NOT IN ('duplicate', 'deprecated', 'rejected') GROUP BY domain ORDER BY cnt DESC """)).fetchall() by_regulation = db.execute(text(""" SELECT cpl.source_regulation AS regulation, count(DISTINCT cc.id) AS cnt FROM canonical_controls cc JOIN control_parent_links cpl ON cpl.control_uuid = cc.id WHERE cc.decomposition_method = 'pass0b' AND cc.release_state NOT IN ('duplicate', 'deprecated', 'rejected') AND cpl.source_regulation IS NOT NULL GROUP BY cpl.source_regulation ORDER BY cnt DESC """)).fetchall() avg_coverage = db.execute(text(""" SELECT COALESCE(AVG(reg_count), 0) FROM ( SELECT cc.id, count(DISTINCT cpl.source_regulation) AS reg_count FROM canonical_controls cc LEFT JOIN control_parent_links cpl ON cpl.control_uuid = cc.id WHERE cc.decomposition_method = 'pass0b' AND cc.release_state NOT IN ('duplicate', 'deprecated', 'rejected') GROUP BY cc.id ) sub """)).scalar() or 0 return { "total_active": total_active, "total_duplicate": total_duplicate, "by_domain": [{"domain": r[0], "count": r[1]} for r in by_domain], "by_regulation": [{"regulation": r[0], "count": r[1]} for r in by_regulation], "avg_regulation_coverage": round(float(avg_coverage), 1), } @router.get("/controls/v1-enrichment-stats") async def v1_enrichment_stats_endpoint(): """ Uebersicht: Wie viele v1 Controls haben regulatorische Abdeckung? """ from compliance.services.v1_enrichment import get_v1_enrichment_stats return await get_v1_enrichment_stats() @router.get("/controls/{control_id}") async def get_control(control_id: str): """Get a single canonical control by its control_id (e.g. AUTH-001).""" with SessionLocal() as db: row = db.execute( text(f""" SELECT {_CONTROL_COLS} FROM canonical_controls WHERE control_id = :cid """), {"cid": control_id.upper()}, ).fetchone() if not row: raise HTTPException(status_code=404, detail="Control not found") return _control_row(row) @router.get("/controls/{control_id}/traceability") async def get_control_traceability(control_id: str): """Get the full traceability chain for a control. For atomic controls: shows all parent links with source regulations, articles, and the obligation chain. For rich controls: shows child atomic controls derived from them. """ with SessionLocal() as db: # Get control UUID ctrl = db.execute( text(""" SELECT id, control_id, title, parent_control_uuid, decomposition_method, source_citation FROM canonical_controls WHERE control_id = :cid """), {"cid": control_id.upper()}, ).fetchone() if not ctrl: raise HTTPException(status_code=404, detail="Control not found") result: dict[str, Any] = { "control_id": ctrl.control_id, "title": ctrl.title, "is_atomic": ctrl.decomposition_method == "pass0b", } ctrl_uuid = str(ctrl.id) # Parent links (M:N) — for atomic controls parent_links = db.execute( text(""" SELECT cpl.parent_control_uuid, cpl.link_type, cpl.confidence, cpl.source_regulation, cpl.source_article, cpl.obligation_candidate_id, cc.control_id AS parent_control_id, cc.title AS parent_title, cc.source_citation AS parent_citation, oc.obligation_text, oc.action, oc.object, oc.normative_strength FROM control_parent_links cpl JOIN canonical_controls cc ON cc.id = cpl.parent_control_uuid LEFT JOIN obligation_candidates oc ON oc.id = cpl.obligation_candidate_id WHERE cpl.control_uuid = CAST(:uid AS uuid) ORDER BY cpl.source_regulation, cpl.source_article """), {"uid": ctrl_uuid}, ).fetchall() result["parent_links"] = [ { "parent_control_id": pl.parent_control_id, "parent_title": pl.parent_title, "link_type": pl.link_type, "confidence": float(pl.confidence) if pl.confidence else 1.0, "source_regulation": pl.source_regulation, "source_article": pl.source_article, "parent_citation": pl.parent_citation, "obligation": { "text": pl.obligation_text, "action": pl.action, "object": pl.object, "normative_strength": pl.normative_strength, } if pl.obligation_text else None, } for pl in parent_links ] # Also include the 1:1 parent (backwards compat) if not already in links if ctrl.parent_control_uuid: parent_uuids_in_links = { str(pl.parent_control_uuid) for pl in parent_links } parent_uuid_str = str(ctrl.parent_control_uuid) if parent_uuid_str not in parent_uuids_in_links: legacy = db.execute( text(""" SELECT control_id, title, source_citation FROM canonical_controls WHERE id = CAST(:uid AS uuid) """), {"uid": parent_uuid_str}, ).fetchone() if legacy: result["parent_links"].insert(0, { "parent_control_id": legacy.control_id, "parent_title": legacy.title, "link_type": "decomposition", "confidence": 1.0, "source_regulation": None, "source_article": None, "parent_citation": legacy.source_citation, "obligation": None, }) # Child controls — for rich controls children = db.execute( text(""" SELECT control_id, title, category, severity, decomposition_method FROM canonical_controls WHERE parent_control_uuid = CAST(:uid AS uuid) ORDER BY control_id """), {"uid": ctrl_uuid}, ).fetchall() result["children"] = [ { "control_id": ch.control_id, "title": ch.title, "category": ch.category, "severity": ch.severity, "decomposition_method": ch.decomposition_method, } for ch in children ] # Unique source regulations count regs = set() for pl in result["parent_links"]: if pl.get("source_regulation"): regs.add(pl["source_regulation"]) result["source_count"] = len(regs) return result @router.get("/controls/{control_id}/provenance") async def get_control_provenance(control_id: str): """Get full provenance chain for a control — extends traceability with obligations, document references, merged duplicates, and regulations summary. """ with SessionLocal() as db: ctrl = db.execute( text(""" SELECT id, control_id, title, parent_control_uuid, decomposition_method, source_citation FROM canonical_controls WHERE control_id = :cid """), {"cid": control_id.upper()}, ).fetchone() if not ctrl: raise HTTPException(status_code=404, detail="Control not found") ctrl_uuid = str(ctrl.id) is_atomic = ctrl.decomposition_method == "pass0b" result: dict[str, Any] = { "control_id": ctrl.control_id, "title": ctrl.title, "is_atomic": is_atomic, } # --- Parent links (same as traceability) --- parent_links = db.execute( text(""" SELECT cpl.parent_control_uuid, cpl.link_type, cpl.confidence, cpl.source_regulation, cpl.source_article, cpl.obligation_candidate_id, cc.control_id AS parent_control_id, cc.title AS parent_title, cc.source_citation AS parent_citation, oc.obligation_text, oc.action, oc.object, oc.normative_strength FROM control_parent_links cpl JOIN canonical_controls cc ON cc.id = cpl.parent_control_uuid LEFT JOIN obligation_candidates oc ON oc.id = cpl.obligation_candidate_id WHERE cpl.control_uuid = CAST(:uid AS uuid) ORDER BY cpl.source_regulation, cpl.source_article """), {"uid": ctrl_uuid}, ).fetchall() result["parent_links"] = [ { "parent_control_id": pl.parent_control_id, "parent_title": pl.parent_title, "link_type": pl.link_type, "confidence": float(pl.confidence) if pl.confidence else 1.0, "source_regulation": pl.source_regulation, "source_article": pl.source_article, "parent_citation": pl.parent_citation, "obligation": { "text": pl.obligation_text, "action": pl.action, "object": pl.object, "normative_strength": pl.normative_strength, } if pl.obligation_text else None, } for pl in parent_links ] # Legacy 1:1 parent (backwards compat) if ctrl.parent_control_uuid: parent_uuids_in_links = { str(pl.parent_control_uuid) for pl in parent_links } parent_uuid_str = str(ctrl.parent_control_uuid) if parent_uuid_str not in parent_uuids_in_links: legacy = db.execute( text(""" SELECT control_id, title, source_citation FROM canonical_controls WHERE id = CAST(:uid AS uuid) """), {"uid": parent_uuid_str}, ).fetchone() if legacy: result["parent_links"].insert(0, { "parent_control_id": legacy.control_id, "parent_title": legacy.title, "link_type": "decomposition", "confidence": 1.0, "source_regulation": None, "source_article": None, "parent_citation": legacy.source_citation, "obligation": None, }) # --- Children --- children = db.execute( text(""" SELECT control_id, title, category, severity, decomposition_method FROM canonical_controls WHERE parent_control_uuid = CAST(:uid AS uuid) ORDER BY control_id """), {"uid": ctrl_uuid}, ).fetchall() result["children"] = [ { "control_id": ch.control_id, "title": ch.title, "category": ch.category, "severity": ch.severity, "decomposition_method": ch.decomposition_method, } for ch in children ] # Source count regs = set() for pl in result["parent_links"]: if pl.get("source_regulation"): regs.add(pl["source_regulation"]) result["source_count"] = len(regs) # --- Obligations (for Rich Controls) --- obligations = db.execute( text(""" SELECT candidate_id, obligation_text, action, object, normative_strength, release_state FROM obligation_candidates WHERE parent_control_uuid = CAST(:uid AS uuid) AND release_state NOT IN ('rejected', 'merged') ORDER BY candidate_id """), {"uid": ctrl_uuid}, ).fetchall() result["obligations"] = [ { "candidate_id": ob.candidate_id, "obligation_text": ob.obligation_text, "action": ob.action, "object": ob.object, "normative_strength": ob.normative_strength, "release_state": ob.release_state, } for ob in obligations ] result["obligation_count"] = len(obligations) # --- Document References --- doc_refs = db.execute( text(""" SELECT DISTINCT oe.regulation_code, oe.article, oe.paragraph, oe.extraction_method, oe.confidence FROM obligation_extractions oe WHERE oe.control_uuid = CAST(:uid AS uuid) OR oe.obligation_id IN ( SELECT oc.candidate_id FROM obligation_candidates oc JOIN control_parent_links cpl ON cpl.obligation_candidate_id = oc.id WHERE cpl.control_uuid = CAST(:uid AS uuid) ) ORDER BY oe.regulation_code, oe.article """), {"uid": ctrl_uuid}, ).fetchall() result["document_references"] = [ { "regulation_code": dr.regulation_code, "article": dr.article, "paragraph": dr.paragraph, "extraction_method": dr.extraction_method, "confidence": float(dr.confidence) if dr.confidence else None, } for dr in doc_refs ] # --- Merged Duplicates --- merged = db.execute( text(""" SELECT cc.control_id, cc.title, (SELECT cpl.source_regulation FROM control_parent_links cpl WHERE cpl.control_uuid = cc.id LIMIT 1) AS source_regulation FROM canonical_controls cc WHERE cc.merged_into_uuid = CAST(:uid AS uuid) AND cc.release_state = 'duplicate' ORDER BY cc.control_id """), {"uid": ctrl_uuid}, ).fetchall() result["merged_duplicates"] = [ { "control_id": m.control_id, "title": m.title, "source_regulation": m.source_regulation, } for m in merged ] result["merged_duplicates_count"] = len(merged) # --- Regulations Summary (aggregated from parent_links + doc_refs) --- reg_map: dict[str, dict[str, Any]] = {} for pl in result["parent_links"]: reg = pl.get("source_regulation") if not reg: continue if reg not in reg_map: reg_map[reg] = {"articles": set(), "link_types": set()} if pl.get("source_article"): reg_map[reg]["articles"].add(pl["source_article"]) reg_map[reg]["link_types"].add(pl.get("link_type", "decomposition")) for dr in result["document_references"]: reg = dr.get("regulation_code") if not reg: continue if reg not in reg_map: reg_map[reg] = {"articles": set(), "link_types": set()} if dr.get("article"): reg_map[reg]["articles"].add(dr["article"]) result["regulations_summary"] = [ { "regulation_code": reg, "articles": sorted(info["articles"]), "link_types": sorted(info["link_types"]), } for reg, info in sorted(reg_map.items()) ] return result # ============================================================================= # NORMATIVE STRENGTH BACKFILL # ============================================================================= @router.post("/controls/backfill-normative-strength") async def backfill_normative_strength( dry_run: bool = Query(True, description="Nur zaehlen, nicht aendern"), ): """ Korrigiert normative_strength auf obligation_candidates basierend auf dem source_type der Quell-Regulierung. Dreistufiges Modell: - law (Gesetz): normative_strength bleibt unveraendert - guideline (Leitlinie): max 'should' - framework (Framework): max 'can' Fuer Controls mit mehreren Parent-Links gilt der hoechste source_type. """ from compliance.data.source_type_classification import ( classify_source_regulation, cap_normative_strength, ) with SessionLocal() as db: # 1. Alle Obligations mit source_citation des Parent Controls laden obligations = db.execute(text(""" SELECT oc.id, oc.candidate_id, oc.normative_strength, cc.source_citation->>'source' AS parent_source FROM obligation_candidates oc JOIN canonical_controls cc ON cc.id = oc.parent_control_uuid WHERE oc.release_state NOT IN ('rejected', 'merged') AND oc.normative_strength IS NOT NULL ORDER BY oc.candidate_id """)).fetchall() # 2. Normative strength korrigieren basierend auf source_type changes = [] stats = {"total": len(obligations), "unchanged": 0, "capped_to_should": 0, "capped_to_may": 0, "no_source": 0} for obl in obligations: if not obl.parent_source: stats["no_source"] += 1 continue source_type = classify_source_regulation(obl.parent_source) new_strength = cap_normative_strength(obl.normative_strength, source_type) if new_strength != obl.normative_strength: changes.append({ "id": str(obl.id), "candidate_id": obl.candidate_id, "old_strength": obl.normative_strength, "new_strength": new_strength, "source_type": source_type, "source_regulation": obl.parent_source, }) if new_strength == "should": stats["capped_to_should"] += 1 elif new_strength == "may": stats["capped_to_may"] += 1 else: stats["unchanged"] += 1 # 4. Aenderungen anwenden (wenn kein dry_run) if not dry_run and changes: for change in changes: db.execute(text(""" UPDATE obligation_candidates SET normative_strength = :new_strength WHERE id = CAST(:oid AS uuid) """), {"new_strength": change["new_strength"], "oid": change["id"]}) db.commit() return { "dry_run": dry_run, "stats": stats, "total_changes": len(changes), "sample_changes": changes[:20], } # ============================================================================= # EVIDENCE TYPE BACKFILL # ============================================================================= # Domains that are primarily technical (code-verifiable) _CODE_DOMAINS = frozenset({ "SEC", "AUTH", "CRYPT", "CRYP", "CRY", "NET", "LOG", "ACC", "APP", "SYS", "CI", "CONT", "API", "CLOUD", "IAC", "SAST", "DAST", "DEP", "SBOM", "WEB", "DEV", "SDL", "PKI", "HSM", "TEE", "TPM", "CRX", "CRF", "FWU", "STO", "RUN", "VUL", "MAL", "PLT", "AUT", }) # Domains that are primarily process-based (document-verifiable) _PROCESS_DOMAINS = frozenset({ "GOV", "ORG", "COMP", "LEGAL", "HR", "TRAIN", "AML", "FIN", "RISK", "AUDIT", "AUD", "PROC", "DOC", "PHYS", "PHY", "PRIV", "DPO", "BCDR", "BCP", "VENDOR", "SUPPLY", "SUP", "CERT", "POLICY", "ENV", "HLT", "TRD", "LAB", "PER", "REL", "ISM", "COM", "GAM", "RIS", "PCA", "GNT", "HCA", "RES", "ISS", }) # Domains that are typically hybrid _HYBRID_DOMAINS = frozenset({ "DATA", "AI", "INC", "ID", "IAM", "IDF", "IDP", "IDA", "IDN", "OPS", "MNT", "INT", "BCK", }) def _classify_evidence_type(control_id: str, category: str | None) -> str: """Heuristic: classify a control as code/process/hybrid based on domain prefix.""" domain = control_id.split("-")[0].upper() if control_id else "" if domain in _CODE_DOMAINS: return "code" if domain in _PROCESS_DOMAINS: return "process" if domain in _HYBRID_DOMAINS: return "hybrid" # Fallback: use category if available code_categories = {"encryption", "authentication", "network", "application", "system", "identity"} process_categories = {"compliance", "personnel", "physical", "governance", "risk"} if category in code_categories: return "code" if category in process_categories: return "process" return "process" # Conservative default @router.post("/controls/backfill-evidence-type") async def backfill_evidence_type( dry_run: bool = Query(True, description="Nur zaehlen, nicht aendern"), ): """ Klassifiziert Controls als code/process/hybrid basierend auf Domain-Prefix. Heuristik: - SEC, AUTH, CRYPT, NET, LOG, ... → code - GOV, ORG, COMP, LEGAL, HR, ... → process - DATA, AI, INC → hybrid """ with SessionLocal() as db: rows = db.execute(text(""" SELECT id, control_id, category, evidence_type FROM canonical_controls WHERE release_state NOT IN ('rejected', 'merged') ORDER BY control_id """)).fetchall() changes = [] stats = {"total": len(rows), "already_set": 0, "code": 0, "process": 0, "hybrid": 0} for row in rows: if row.evidence_type is not None: stats["already_set"] += 1 continue new_type = _classify_evidence_type(row.control_id, row.category) stats[new_type] += 1 changes.append({ "id": str(row.id), "control_id": row.control_id, "evidence_type": new_type, }) if not dry_run and changes: for change in changes: db.execute(text(""" UPDATE canonical_controls SET evidence_type = :et WHERE id = CAST(:cid AS uuid) """), {"et": change["evidence_type"], "cid": change["id"]}) db.commit() return { "dry_run": dry_run, "stats": stats, "total_changes": len(changes), "sample_changes": changes[:20], } # ============================================================================= # RATIONALE BACKFILL (LLM) # ============================================================================= @router.post("/controls/backfill-rationale") async def backfill_rationale( dry_run: bool = Query(True, description="Nur zaehlen, nicht aendern"), batch_size: int = Query(50, description="Parent-Controls pro Durchlauf"), offset: int = Query(0, description="Offset fuer Paginierung (Parent-Index)"), ): """ Generiert sinnvolle Begruendungen fuer atomare Controls per LLM. Optimierung: Gruppiert nach Parent-Control (~7k Parents statt ~86k Einzel-Calls). Pro Parent-Gruppe wird EIN LLM-Aufruf gemacht, der eine gemeinsame Begruendung fuer alle Kinder erzeugt. Workflow: 1. dry_run=true → Statistiken anzeigen 2. dry_run=false&batch_size=50&offset=0 → Erste 50 Parents verarbeiten 3. Wiederholen mit offset=50, 100, ... bis fertig """ from compliance.services.llm_provider import get_llm_provider with SessionLocal() as db: # 1. Parent-Controls mit Kindern laden (nur wo rationale = Placeholder) parents = db.execute(text(""" SELECT p.id AS parent_uuid, p.control_id, p.title, p.category, p.source_citation->>'source' AS source_name, COUNT(c.id) AS child_count FROM canonical_controls p JOIN canonical_controls c ON c.parent_control_uuid = p.id WHERE c.rationale = 'Aus Obligation abgeleitet.' AND c.release_state NOT IN ('rejected', 'merged') GROUP BY p.id, p.control_id, p.title, p.category, p.source_citation->>'source' ORDER BY p.control_id """)).fetchall() total_parents = len(parents) total_children = sum(p.child_count for p in parents) if dry_run: return { "dry_run": True, "total_parents": total_parents, "total_children": total_children, "estimated_llm_calls": total_parents, "sample_parents": [ { "control_id": p.control_id, "title": p.title, "source": p.source_name, "child_count": p.child_count, } for p in parents[:10] ], } # 2. Batch auswählen batch = parents[offset : offset + batch_size] if not batch: return { "dry_run": False, "message": "Kein weiterer Batch — alle Parents verarbeitet.", "total_parents": total_parents, "offset": offset, "processed": 0, } provider = get_llm_provider() processed = 0 children_updated = 0 errors = [] sample_rationales = [] for parent in batch: parent_uuid = str(parent.parent_uuid) source = parent.source_name or "Regulierung" # LLM-Prompt prompt = ( f"Du bist Compliance-Experte. Erklaere in 1-2 Saetzen auf Deutsch, " f"WARUM aus dem uebergeordneten Control atomare Teilmassnahmen " f"abgeleitet wurden.\n\n" f"Uebergeordnetes Control: {parent.control_id} — {parent.title}\n" f"Regulierung: {source}\n" f"Kategorie: {parent.category or 'k.A.'}\n" f"Anzahl atomarer Controls: {parent.child_count}\n\n" f"Schreibe NUR die Begruendung (1-2 Saetze). Kein Markdown, " f"keine Aufzaehlung, kein Praefix. " f"Erklaere den regulatorischen Hintergrund und warum die " f"Zerlegung in atomare, testbare Massnahmen notwendig ist." ) try: response = await provider.complete( prompt=prompt, max_tokens=256, temperature=0.3, ) rationale = response.content.strip() # Bereinigen: Anfuehrungszeichen, Markdown entfernen rationale = rationale.strip('"').strip("'").strip() if rationale.startswith("Begründung:") or rationale.startswith("Begruendung:"): rationale = rationale.split(":", 1)[1].strip() # Laenge begrenzen (max 500 Zeichen) if len(rationale) > 500: rationale = rationale[:497] + "..." if not rationale or len(rationale) < 10: errors.append({ "control_id": parent.control_id, "error": "LLM-Antwort zu kurz oder leer", }) continue # Alle Kinder dieses Parents updaten result = db.execute( text(""" UPDATE canonical_controls SET rationale = :rationale WHERE parent_control_uuid = CAST(:pid AS uuid) AND rationale = 'Aus Obligation abgeleitet.' AND release_state NOT IN ('rejected', 'merged') """), {"rationale": rationale, "pid": parent_uuid}, ) children_updated += result.rowcount processed += 1 if len(sample_rationales) < 5: sample_rationales.append({ "parent": parent.control_id, "title": parent.title, "rationale": rationale, "children_updated": result.rowcount, }) except Exception as e: logger.error(f"LLM error for {parent.control_id}: {e}") errors.append({ "control_id": parent.control_id, "error": str(e)[:200], }) # Rollback um DB-Session nach Fehler nutzbar zu halten try: db.rollback() except Exception: pass db.commit() return { "dry_run": False, "offset": offset, "batch_size": batch_size, "next_offset": offset + batch_size if offset + batch_size < total_parents else None, "processed_parents": processed, "children_updated": children_updated, "total_parents": total_parents, "total_children": total_children, "errors": errors[:10], "sample_rationales": sample_rationales, } # ============================================================================= # CONTROL CRUD (CREATE / UPDATE / DELETE) # ============================================================================= @router.post("/controls", status_code=201) async def create_control(body: ControlCreateRequest): """Create a new canonical control.""" import json as _json import re # Validate control_id format if not re.match(r"^[A-Z]{2,6}-[0-9]{3}$", body.control_id): raise HTTPException(status_code=400, detail="control_id must match DOMAIN-NNN (e.g. AUTH-001)") if body.severity not in ("low", "medium", "high", "critical"): raise HTTPException(status_code=400, detail="severity must be low/medium/high/critical") if body.risk_score is not None and not (0 <= body.risk_score <= 10): raise HTTPException(status_code=400, detail="risk_score must be 0..10") with SessionLocal() as db: # Resolve framework fw = db.execute( text("SELECT id FROM canonical_control_frameworks WHERE framework_id = :fid"), {"fid": body.framework_id}, ).fetchone() if not fw: raise HTTPException(status_code=404, detail=f"Framework '{body.framework_id}' not found") # Check duplicate existing = db.execute( text("SELECT id FROM canonical_controls WHERE framework_id = :fid AND control_id = :cid"), {"fid": str(fw.id), "cid": body.control_id}, ).fetchone() if existing: raise HTTPException(status_code=409, detail=f"Control '{body.control_id}' already exists") row = db.execute( text(f""" INSERT INTO canonical_controls ( framework_id, control_id, title, objective, rationale, scope, requirements, test_procedure, evidence, severity, risk_score, implementation_effort, evidence_confidence, open_anchors, release_state, tags, license_rule, source_original_text, source_citation, customer_visible, verification_method, category, evidence_type, target_audience, generation_metadata, applicable_industries, applicable_company_size, scope_conditions ) VALUES ( :fw_id, :cid, :title, :objective, :rationale, CAST(:scope AS jsonb), CAST(:requirements AS jsonb), CAST(:test_procedure AS jsonb), CAST(:evidence AS jsonb), :severity, :risk_score, :effort, :confidence, CAST(:anchors AS jsonb), :release_state, CAST(:tags AS jsonb), :license_rule, :source_original_text, CAST(:source_citation AS jsonb), :customer_visible, :verification_method, :category, :evidence_type, :target_audience, CAST(:generation_metadata AS jsonb), CAST(:applicable_industries AS jsonb), CAST(:applicable_company_size AS jsonb), CAST(:scope_conditions AS jsonb) ) RETURNING {_CONTROL_COLS} """), { "fw_id": str(fw.id), "cid": body.control_id, "title": body.title, "objective": body.objective, "rationale": body.rationale, "scope": _json.dumps(body.scope), "requirements": _json.dumps(body.requirements), "test_procedure": _json.dumps(body.test_procedure), "evidence": _json.dumps(body.evidence), "severity": body.severity, "risk_score": body.risk_score, "effort": body.implementation_effort, "confidence": body.evidence_confidence, "anchors": _json.dumps(body.open_anchors), "release_state": body.release_state, "tags": _json.dumps(body.tags), "license_rule": body.license_rule, "source_original_text": body.source_original_text, "source_citation": _json.dumps(body.source_citation) if body.source_citation else None, "customer_visible": body.customer_visible, "verification_method": body.verification_method, "category": body.category, "evidence_type": body.evidence_type, "target_audience": body.target_audience, "generation_metadata": _json.dumps(body.generation_metadata) if body.generation_metadata else None, "applicable_industries": _json.dumps(body.applicable_industries) if body.applicable_industries else None, "applicable_company_size": _json.dumps(body.applicable_company_size) if body.applicable_company_size else None, "scope_conditions": _json.dumps(body.scope_conditions) if body.scope_conditions else None, }, ).fetchone() db.commit() return _control_row(row) @router.put("/controls/{control_id}") async def update_control(control_id: str, body: ControlUpdateRequest): """Update an existing canonical control (partial update).""" import json as _json updates = body.dict(exclude_none=True) if not updates: raise HTTPException(status_code=400, detail="No fields to update") if "severity" in updates and updates["severity"] not in ("low", "medium", "high", "critical"): raise HTTPException(status_code=400, detail="severity must be low/medium/high/critical") if "risk_score" in updates and updates["risk_score"] is not None and not (0 <= updates["risk_score"] <= 10): raise HTTPException(status_code=400, detail="risk_score must be 0..10") # Build dynamic SET clause set_parts = [] params: dict[str, Any] = {"cid": control_id.upper()} json_fields = {"scope", "requirements", "test_procedure", "evidence", "open_anchors", "tags", "source_citation", "generation_metadata"} for key, val in updates.items(): col = key if key in json_fields: set_parts.append(f"{col} = CAST(:{key} AS jsonb)") params[key] = _json.dumps(val) else: set_parts.append(f"{col} = :{key}") params[key] = val set_parts.append("updated_at = NOW()") with SessionLocal() as db: row = db.execute( text(f""" UPDATE canonical_controls SET {', '.join(set_parts)} WHERE control_id = :cid RETURNING {_CONTROL_COLS} """), params, ).fetchone() if not row: raise HTTPException(status_code=404, detail="Control not found") db.commit() return _control_row(row) @router.delete("/controls/{control_id}", status_code=204) async def delete_control(control_id: str): """Delete a canonical control.""" with SessionLocal() as db: result = db.execute( text("DELETE FROM canonical_controls WHERE control_id = :cid"), {"cid": control_id.upper()}, ) if result.rowcount == 0: raise HTTPException(status_code=404, detail="Control not found") db.commit() return None # ============================================================================= # SIMILARITY CHECK # ============================================================================= @router.post("/controls/{control_id}/similarity-check") async def similarity_check(control_id: str, body: SimilarityCheckRequest): """Run the too-close detector against a source/candidate text pair.""" report = await check_similarity(body.source_text, body.candidate_text) return { "control_id": control_id.upper(), "max_exact_run": report.max_exact_run, "token_overlap": report.token_overlap, "ngram_jaccard": report.ngram_jaccard, "embedding_cosine": report.embedding_cosine, "lcs_ratio": report.lcs_ratio, "status": report.status, "details": report.details, } # ============================================================================= # CATEGORIES # ============================================================================= @router.get("/categories") async def list_categories(): """List all canonical control categories.""" with SessionLocal() as db: rows = db.execute( text("SELECT category_id, label_de, label_en, sort_order FROM canonical_control_categories ORDER BY sort_order") ).fetchall() return [ { "category_id": r.category_id, "label_de": r.label_de, "label_en": r.label_en, "sort_order": r.sort_order, } for r in rows ] # ============================================================================= # SIMILAR CONTROLS (Embedding-based dedup) # ============================================================================= @router.get("/controls/{control_id}/similar") async def find_similar_controls( control_id: str, threshold: float = Query(0.85, ge=0.5, le=1.0), limit: int = Query(20, ge=1, le=100), ): """Find controls similar to the given one using embedding cosine similarity.""" with SessionLocal() as db: # Get the target control's embedding target = db.execute( text(""" SELECT id, control_id, title, objective FROM canonical_controls WHERE control_id = :cid """), {"cid": control_id.upper()}, ).fetchone() if not target: raise HTTPException(status_code=404, detail="Control not found") # Find similar controls using pg_vector cosine distance if available, # otherwise fall back to text-based matching via objective similarity try: rows = db.execute( text(""" SELECT c.control_id, c.title, c.severity, c.release_state, c.tags, c.license_rule, c.verification_method, c.category, 1 - (c.embedding <=> t.embedding) AS similarity FROM canonical_controls c, canonical_controls t WHERE t.control_id = :cid AND c.control_id != :cid AND c.release_state != 'deprecated' AND c.embedding IS NOT NULL AND t.embedding IS NOT NULL AND 1 - (c.embedding <=> t.embedding) >= :threshold ORDER BY similarity DESC LIMIT :lim """), {"cid": control_id.upper(), "threshold": threshold, "lim": limit}, ).fetchall() return [ { "control_id": r.control_id, "title": r.title, "severity": r.severity, "release_state": r.release_state, "tags": r.tags or [], "license_rule": r.license_rule, "verification_method": r.verification_method, "category": r.category, "similarity": round(float(r.similarity), 4), } for r in rows ] except Exception as e: logger.warning("Embedding similarity query failed (no embedding column?): %s", e) return [] # ============================================================================= # SOURCES & LICENSES # ============================================================================= @router.get("/sources") async def list_sources(): """List all registered sources with permission flags.""" with SessionLocal() as db: return get_source_permissions(db) @router.get("/licenses") async def list_licenses(): """Return the license matrix.""" with SessionLocal() as db: return get_license_matrix(db) # ============================================================================= # V1 ENRICHMENT (Eigenentwicklung → Regulatorische Abdeckung) # ============================================================================= @router.post("/controls/enrich-v1-matches") async def enrich_v1_matches_endpoint( dry_run: bool = Query(True, description="Nur zaehlen, nicht schreiben"), batch_size: int = Query(100, description="Controls pro Durchlauf"), offset: int = Query(0, description="Offset fuer Paginierung"), ): """ Findet regulatorische Abdeckung fuer v1 Eigenentwicklung Controls. Eigenentwicklung = generation_strategy='ungrouped', pipeline_version=1, source_citation IS NULL, parent_control_uuid IS NULL. Workflow: 1. dry_run=true → Statistiken anzeigen 2. dry_run=false&batch_size=100&offset=0 → Erste 100 verarbeiten 3. Wiederholen mit next_offset bis fertig """ from compliance.services.v1_enrichment import enrich_v1_matches return await enrich_v1_matches( dry_run=dry_run, batch_size=batch_size, offset=offset, ) @router.get("/controls/{control_id}/v1-matches") async def get_v1_matches_endpoint(control_id: str): """ Gibt regulatorische Matches fuer ein v1 Control zurueck. Returns: Liste von Matches mit Control-Details, Source, Score. """ from compliance.services.v1_enrichment import get_v1_matches # Resolve control_id to UUID with SessionLocal() as db: row = db.execute(text(""" SELECT id FROM canonical_controls WHERE control_id = :cid """), {"cid": control_id}).fetchone() if not row: raise HTTPException(status_code=404, detail=f"Control {control_id} not found") return await get_v1_matches(str(row.id)) # ============================================================================= # INTERNAL HELPERS # ============================================================================= def _control_row(r) -> dict: return { "id": str(r.id), "framework_id": str(r.framework_id), "control_id": r.control_id, "title": r.title, "objective": r.objective, "rationale": r.rationale, "scope": r.scope, "requirements": r.requirements, "test_procedure": r.test_procedure, "evidence": r.evidence, "severity": r.severity, "risk_score": float(r.risk_score) if r.risk_score is not None else None, "implementation_effort": r.implementation_effort, "evidence_confidence": float(r.evidence_confidence) if r.evidence_confidence is not None else None, "open_anchors": r.open_anchors, "release_state": r.release_state, "tags": r.tags or [], "license_rule": r.license_rule, "source_original_text": r.source_original_text, "source_citation": r.source_citation, "customer_visible": r.customer_visible, "verification_method": r.verification_method, "category": r.category, "evidence_type": getattr(r, "evidence_type", None), "target_audience": r.target_audience, "generation_metadata": r.generation_metadata, "generation_strategy": getattr(r, "generation_strategy", "ungrouped"), "applicable_industries": getattr(r, "applicable_industries", None), "applicable_company_size": getattr(r, "applicable_company_size", None), "scope_conditions": getattr(r, "scope_conditions", None), "parent_control_uuid": str(r.parent_control_uuid) if getattr(r, "parent_control_uuid", None) else None, "parent_control_id": getattr(r, "parent_control_id", None), "parent_control_title": getattr(r, "parent_control_title", None), "decomposition_method": getattr(r, "decomposition_method", None), "pipeline_version": getattr(r, "pipeline_version", None), "created_at": r.created_at.isoformat() if r.created_at else None, "updated_at": r.updated_at.isoformat() if r.updated_at else None, }