""" FastAPI routes for the Canonical Control Library. Independently authored security controls anchored in open-source frameworks (OWASP, NIST, ENISA). No proprietary nomenclature. Endpoints: GET /v1/canonical/frameworks — All frameworks GET /v1/canonical/frameworks/{framework_id} — Framework details GET /v1/canonical/frameworks/{framework_id}/controls — Controls of a framework GET /v1/canonical/controls — All controls (filterable) GET /v1/canonical/controls/{control_id} — Single control GET /v1/canonical/controls/{control_id}/traceability — Traceability chain GET /v1/canonical/controls/{control_id}/similar — Find similar controls POST /v1/canonical/controls — Create a control PUT /v1/canonical/controls/{control_id} — Update a control DELETE /v1/canonical/controls/{control_id} — Delete a control GET /v1/canonical/categories — Category list GET /v1/canonical/sources — Source registry GET /v1/canonical/licenses — License matrix POST /v1/canonical/controls/{control_id}/similarity-check — Too-close check """ from __future__ import annotations import json import logging from typing import Any, Optional from fastapi import APIRouter, Depends, Query from sqlalchemy.orm import Session from classroom_engine.database import get_db from compliance.api._http_errors import translate_domain_errors from compliance.schemas.canonical_control import ( ControlCreateRequest, ControlResponse, ControlUpdateRequest, FrameworkResponse, SimilarityCheckRequest, SimilarityCheckResponse, ) from compliance.services.canonical_control_service import ( CanonicalControlService, _control_row, # re-exported for legacy test imports ) router = APIRouter(prefix="/v1/canonical", tags=["canonical-controls"]) # ============================================================================= # RESPONSE MODELS # ============================================================================= class FrameworkResponse(BaseModel): id: str framework_id: str name: str version: str description: Optional[str] = None owner: Optional[str] = None policy_version: Optional[str] = None release_state: str created_at: str updated_at: str class ControlResponse(BaseModel): id: str framework_id: str control_id: str title: str objective: str rationale: str scope: dict requirements: list test_procedure: list evidence: list severity: str risk_score: Optional[float] = None implementation_effort: Optional[str] = None evidence_confidence: Optional[float] = None open_anchors: list release_state: str tags: list license_rule: Optional[int] = None source_original_text: Optional[str] = None source_citation: Optional[dict] = None customer_visible: Optional[bool] = None verification_method: Optional[str] = None category: Optional[str] = None evidence_type: Optional[str] = None target_audience: Optional[str] = None generation_metadata: Optional[dict] = None generation_strategy: Optional[str] = "ungrouped" applicable_industries: Optional[list] = None applicable_company_size: Optional[list] = None scope_conditions: Optional[dict] = None created_at: str updated_at: str class ControlCreateRequest(BaseModel): framework_id: str # e.g. 'bp_security_v1' control_id: str # e.g. 'AUTH-003' title: str objective: str rationale: str scope: dict = {} requirements: list = [] test_procedure: list = [] evidence: list = [] severity: str = "medium" risk_score: Optional[float] = None implementation_effort: Optional[str] = None evidence_confidence: Optional[float] = None open_anchors: list = [] release_state: str = "draft" tags: list = [] license_rule: Optional[int] = None source_original_text: Optional[str] = None source_citation: Optional[dict] = None customer_visible: Optional[bool] = True verification_method: Optional[str] = None category: Optional[str] = None evidence_type: Optional[str] = None target_audience: Optional[str] = None generation_metadata: Optional[dict] = None applicable_industries: Optional[list] = None applicable_company_size: Optional[list] = None scope_conditions: Optional[dict] = None class ControlUpdateRequest(BaseModel): title: Optional[str] = None objective: Optional[str] = None rationale: Optional[str] = None scope: Optional[dict] = None requirements: Optional[list] = None test_procedure: Optional[list] = None evidence: Optional[list] = None severity: Optional[str] = None risk_score: Optional[float] = None implementation_effort: Optional[str] = None evidence_confidence: Optional[float] = None open_anchors: Optional[list] = None release_state: Optional[str] = None tags: Optional[list] = None license_rule: Optional[int] = None source_original_text: Optional[str] = None source_citation: Optional[dict] = None customer_visible: Optional[bool] = None verification_method: Optional[str] = None category: Optional[str] = None evidence_type: Optional[str] = None target_audience: Optional[str] = None generation_metadata: Optional[dict] = None applicable_industries: Optional[list] = None applicable_company_size: Optional[list] = None scope_conditions: Optional[dict] = None class SimilarityCheckRequest(BaseModel): source_text: str candidate_text: str class SimilarityCheckResponse(BaseModel): max_exact_run: int token_overlap: float ngram_jaccard: float embedding_cosine: float lcs_ratio: float status: str details: dict # ============================================================================= # HELPERS # ============================================================================= _CONTROL_COLS = """id, framework_id, control_id, title, objective, rationale, scope, requirements, test_procedure, evidence, severity, risk_score, implementation_effort, evidence_confidence, open_anchors, release_state, tags, license_rule, source_original_text, source_citation, customer_visible, verification_method, category, evidence_type, target_audience, generation_metadata, generation_strategy, applicable_industries, applicable_company_size, scope_conditions, parent_control_uuid, decomposition_method, pipeline_version, (SELECT p.control_id FROM canonical_controls p WHERE p.id = canonical_controls.parent_control_uuid) AS parent_control_id, (SELECT p.title FROM canonical_controls p WHERE p.id = canonical_controls.parent_control_uuid) AS parent_control_title, created_at, updated_at""" def _row_to_dict(row, columns: list[str]) -> dict[str, Any]: """Generic row → dict converter.""" return {col: (getattr(row, col).isoformat() if hasattr(getattr(row, col, None), 'isoformat') else getattr(row, col)) for col in columns} # ============================================================================= # FRAMEWORKS # ============================================================================= @router.get("/frameworks") async def list_frameworks( service: CanonicalControlService = Depends(get_canonical_service), ) -> list[dict[str, Any]]: """List all registered control frameworks.""" with translate_domain_errors(): return service.list_frameworks() @router.get("/frameworks/{framework_id}") async def get_framework( framework_id: str, service: CanonicalControlService = Depends(get_canonical_service), ) -> dict[str, Any]: """Get a single framework by its framework_id.""" with translate_domain_errors(): return service.get_framework(framework_id) @router.get("/frameworks/{framework_id}/controls") async def list_framework_controls( framework_id: str, severity: Optional[str] = Query(None), release_state: Optional[str] = Query(None), verification_method: Optional[str] = Query(None), category: Optional[str] = Query(None), target_audience: Optional[str] = Query(None), ): """List controls belonging to a framework.""" with SessionLocal() as db: # Resolve framework UUID fw = db.execute( text("SELECT id FROM canonical_control_frameworks WHERE framework_id = :fid"), {"fid": framework_id}, ).fetchone() if not fw: raise HTTPException(status_code=404, detail="Framework not found") query = f""" SELECT {_CONTROL_COLS} FROM canonical_controls WHERE framework_id = :fw_id """ params: dict[str, Any] = {"fw_id": str(fw.id)} if severity: query += " AND severity = :sev" params["sev"] = severity if release_state: query += " AND release_state = :rs" params["rs"] = release_state if verification_method: query += " AND verification_method = :vm" params["vm"] = verification_method if category: query += " AND category = :cat" params["cat"] = category if target_audience: query += " AND target_audience::jsonb @> (:ta)::jsonb" params["ta"] = json.dumps([target_audience]) query += " ORDER BY control_id" rows = db.execute(text(query), params).fetchall() return [_control_row(r) for r in rows] # ============================================================================= # CONTROLS # ============================================================================= @router.get("/controls") async def list_controls( severity: Optional[str] = Query(None), domain: Optional[str] = Query(None), release_state: Optional[str] = Query(None), verification_method: Optional[str] = Query(None), category: Optional[str] = Query(None), evidence_type: Optional[str] = Query(None, description="Filter: code, process, hybrid"), target_audience: Optional[str] = Query(None), source: Optional[str] = Query(None, description="Filter by source_citation->source"), search: Optional[str] = Query(None, description="Full-text search in control_id, title, objective"), control_type: Optional[str] = Query(None, description="Filter: atomic, rich, or all"), exclude_duplicates: bool = Query(False, description="Exclude controls with release_state='duplicate'"), sort: Optional[str] = Query("control_id", description="Sort field: control_id, created_at, severity"), order: Optional[str] = Query("asc", description="Sort order: asc or desc"), limit: Optional[int] = Query(None, ge=1, le=5000, description="Max results"), offset: Optional[int] = Query(None, ge=0, description="Offset for pagination"), ): """List canonical controls with filters, search, sorting and pagination.""" query = f""" SELECT {_CONTROL_COLS} FROM canonical_controls WHERE 1=1 """ params: dict[str, Any] = {} if exclude_duplicates: query += " AND release_state != 'duplicate'" if severity: query += " AND severity = :sev" params["sev"] = severity if domain: query += " AND LEFT(control_id, LENGTH(:dom)) = :dom" params["dom"] = domain.upper() if release_state: query += " AND release_state = :rs" params["rs"] = release_state if verification_method: if verification_method == "__none__": query += " AND verification_method IS NULL" else: query += " AND verification_method = :vm" params["vm"] = verification_method if category: if category == "__none__": query += " AND category IS NULL" else: query += " AND category = :cat" params["cat"] = category if evidence_type: if evidence_type == "__none__": query += " AND evidence_type IS NULL" else: query += " AND evidence_type = :et" params["et"] = evidence_type if target_audience: query += " AND target_audience LIKE :ta_pattern" params["ta_pattern"] = f'%"{target_audience}"%' if source: if source == "__none__": query += " AND (source_citation IS NULL OR source_citation->>'source' IS NULL OR source_citation->>'source' = '')" else: query += " AND source_citation->>'source' = :src" params["src"] = source if control_type == "atomic": query += " AND decomposition_method = 'pass0b'" elif control_type == "rich": query += " AND (decomposition_method IS NULL OR decomposition_method != 'pass0b')" elif control_type == "eigenentwicklung": query += """ AND generation_strategy = 'ungrouped' AND (pipeline_version = '1' OR pipeline_version IS NULL) AND source_citation IS NULL AND parent_control_uuid IS NULL""" if search: query += " AND (control_id ILIKE :q OR title ILIKE :q OR objective ILIKE :q)" params["q"] = f"%{search}%" # Sorting sort_col = "control_id" if sort in ("created_at", "updated_at", "severity", "control_id"): sort_col = sort elif sort == "source": sort_col = "source_citation->>'source'" sort_dir = "DESC" if order and order.lower() == "desc" else "ASC" if sort == "source": # Group by source first, then by control_id within each source query += f" ORDER BY {sort_col} {sort_dir} NULLS LAST, control_id ASC" else: query += f" ORDER BY {sort_col} {sort_dir}" if limit is not None: query += " LIMIT :lim" params["lim"] = limit if offset is not None: query += " OFFSET :off" params["off"] = offset with SessionLocal() as db: rows = db.execute(text(query), params).fetchall() return [_control_row(r) for r in rows] @router.get("/controls-count") async def count_controls( severity: Optional[str] = Query(None), domain: Optional[str] = Query(None), release_state: Optional[str] = Query(None), verification_method: Optional[str] = Query(None), category: Optional[str] = Query(None), evidence_type: Optional[str] = Query(None), target_audience: Optional[str] = Query(None), source: Optional[str] = Query(None), search: Optional[str] = Query(None), control_type: Optional[str] = Query(None), exclude_duplicates: bool = Query(False, description="Exclude controls with release_state='duplicate'"), ): """Count controls matching filters (for pagination).""" query = "SELECT count(*) FROM canonical_controls WHERE 1=1" params: dict[str, Any] = {} if exclude_duplicates: query += " AND release_state != 'duplicate'" if severity: query += " AND severity = :sev" params["sev"] = severity if domain: query += " AND LEFT(control_id, LENGTH(:dom)) = :dom" params["dom"] = domain.upper() if release_state: query += " AND release_state = :rs" params["rs"] = release_state if verification_method: if verification_method == "__none__": query += " AND verification_method IS NULL" else: query += " AND verification_method = :vm" params["vm"] = verification_method if category: if category == "__none__": query += " AND category IS NULL" else: query += " AND category = :cat" params["cat"] = category if evidence_type: if evidence_type == "__none__": query += " AND evidence_type IS NULL" else: query += " AND evidence_type = :et" params["et"] = evidence_type if target_audience: query += " AND target_audience LIKE :ta_pattern" params["ta_pattern"] = f'%"{target_audience}"%' if source: if source == "__none__": query += " AND (source_citation IS NULL OR source_citation->>'source' IS NULL OR source_citation->>'source' = '')" else: query += " AND source_citation->>'source' = :src" params["src"] = source if control_type == "atomic": query += " AND decomposition_method = 'pass0b'" elif control_type == "rich": query += " AND (decomposition_method IS NULL OR decomposition_method != 'pass0b')" elif control_type == "eigenentwicklung": query += """ AND generation_strategy = 'ungrouped' AND (pipeline_version = '1' OR pipeline_version IS NULL) AND source_citation IS NULL AND parent_control_uuid IS NULL""" if search: query += " AND (control_id ILIKE :q OR title ILIKE :q OR objective ILIKE :q)" params["q"] = f"%{search}%" with SessionLocal() as db: total = db.execute(text(query), params).scalar() return {"total": total} @router.get("/controls-meta") async def controls_meta( severity: Optional[str] = Query(None), domain: Optional[str] = Query(None), release_state: Optional[str] = Query(None), verification_method: Optional[str] = Query(None), category: Optional[str] = Query(None), evidence_type: Optional[str] = Query(None), target_audience: Optional[str] = Query(None), source: Optional[str] = Query(None), search: Optional[str] = Query(None), control_type: Optional[str] = Query(None), exclude_duplicates: bool = Query(False), ): """Return faceted metadata for filter dropdowns. Each facet's counts respect ALL active filters EXCEPT the facet's own, so dropdowns always show how many items each option would yield. """ def _build_where(skip: Optional[str] = None) -> tuple[str, dict[str, Any]]: clauses = ["1=1"] p: dict[str, Any] = {} if exclude_duplicates: clauses.append("release_state != 'duplicate'") if severity and skip != "severity": clauses.append("severity = :sev") p["sev"] = severity if domain and skip != "domain": clauses.append("LEFT(control_id, LENGTH(:dom)) = :dom") p["dom"] = domain.upper() if release_state and skip != "release_state": clauses.append("release_state = :rs") p["rs"] = release_state if verification_method and skip != "verification_method": if verification_method == "__none__": clauses.append("verification_method IS NULL") else: clauses.append("verification_method = :vm") p["vm"] = verification_method if category and skip != "category": if category == "__none__": clauses.append("category IS NULL") else: clauses.append("category = :cat") p["cat"] = category if evidence_type and skip != "evidence_type": if evidence_type == "__none__": clauses.append("evidence_type IS NULL") else: clauses.append("evidence_type = :et") p["et"] = evidence_type if target_audience and skip != "target_audience": clauses.append("target_audience LIKE :ta_pattern") p["ta_pattern"] = f'%"{target_audience}"%' if source and skip != "source": if source == "__none__": clauses.append("(source_citation IS NULL OR source_citation->>'source' IS NULL OR source_citation->>'source' = '')") else: clauses.append("source_citation->>'source' = :src") p["src"] = source if control_type and skip != "control_type": if control_type == "atomic": clauses.append("decomposition_method = 'pass0b'") elif control_type == "rich": clauses.append("(decomposition_method IS NULL OR decomposition_method != 'pass0b')") elif control_type == "eigenentwicklung": clauses.append("""generation_strategy = 'ungrouped' AND (pipeline_version = '1' OR pipeline_version IS NULL) AND source_citation IS NULL AND parent_control_uuid IS NULL""") if search and skip != "search": clauses.append("(control_id ILIKE :q OR title ILIKE :q OR objective ILIKE :q)") p["q"] = f"%{search}%" return " AND ".join(clauses), p with SessionLocal() as db: # Total with ALL filters w_all, p_all = _build_where() total = db.execute(text(f"SELECT count(*) FROM canonical_controls WHERE {w_all}"), p_all).scalar() # Domain facet (skip domain filter so user sees all domains) w_dom, p_dom = _build_where(skip="domain") domains = db.execute(text(f""" SELECT UPPER(SPLIT_PART(control_id, '-', 1)) as domain, count(*) as cnt FROM canonical_controls WHERE {w_dom} GROUP BY domain ORDER BY domain """), p_dom).fetchall() # Source facet (skip source filter) w_src, p_src = _build_where(skip="source") sources = db.execute(text(f""" SELECT source_citation->>'source' as src, count(*) as cnt FROM canonical_controls WHERE {w_src} AND source_citation->>'source' IS NOT NULL AND source_citation->>'source' != '' GROUP BY src ORDER BY cnt DESC """), p_src).fetchall() no_source = db.execute(text(f""" SELECT count(*) FROM canonical_controls WHERE {w_src} AND (source_citation IS NULL OR source_citation->>'source' IS NULL OR source_citation->>'source' = '') """), p_src).scalar() # Type facet (skip control_type filter) w_typ, p_typ = _build_where(skip="control_type") atomic_count = db.execute(text(f""" SELECT count(*) FROM canonical_controls WHERE {w_typ} AND decomposition_method = 'pass0b' """), p_typ).scalar() or 0 eigenentwicklung_count = db.execute(text(f""" SELECT count(*) FROM canonical_controls WHERE {w_typ} AND generation_strategy = 'ungrouped' AND (pipeline_version = '1' OR pipeline_version IS NULL) AND source_citation IS NULL AND parent_control_uuid IS NULL """), p_typ).scalar() or 0 rich_count = db.execute(text(f""" SELECT count(*) FROM canonical_controls WHERE {w_typ} AND (decomposition_method IS NULL OR decomposition_method != 'pass0b') """), p_typ).scalar() or 0 # Severity facet (skip severity filter) w_sev, p_sev = _build_where(skip="severity") severity_counts = db.execute(text(f""" SELECT severity, count(*) as cnt FROM canonical_controls WHERE {w_sev} GROUP BY severity ORDER BY severity """), p_sev).fetchall() # Verification method facet (include NULLs as __none__) w_vm, p_vm = _build_where(skip="verification_method") vm_counts = db.execute(text(f""" SELECT COALESCE(verification_method, '__none__') as vm, count(*) as cnt FROM canonical_controls WHERE {w_vm} GROUP BY vm ORDER BY vm """), p_vm).fetchall() # Category facet (include NULLs as __none__) w_cat, p_cat = _build_where(skip="category") cat_counts = db.execute(text(f""" SELECT COALESCE(category, '__none__') as cat, count(*) as cnt FROM canonical_controls WHERE {w_cat} GROUP BY cat ORDER BY cnt DESC """), p_cat).fetchall() # Evidence type facet (include NULLs as __none__) w_et, p_et = _build_where(skip="evidence_type") et_counts = db.execute(text(f""" SELECT COALESCE(evidence_type, '__none__') as et, count(*) as cnt FROM canonical_controls WHERE {w_et} GROUP BY et ORDER BY et """), p_et).fetchall() # Release state facet w_rs, p_rs = _build_where(skip="release_state") rs_counts = db.execute(text(f""" SELECT release_state, count(*) as cnt FROM canonical_controls WHERE {w_rs} GROUP BY release_state ORDER BY release_state """), p_rs).fetchall() return { "total": total, "domains": [{"domain": r[0], "count": r[1]} for r in domains], "sources": [{"source": r[0], "count": r[1]} for r in sources], "no_source_count": no_source, "type_counts": { "rich": rich_count, "atomic": atomic_count, "eigenentwicklung": eigenentwicklung_count, }, "severity_counts": {r[0]: r[1] for r in severity_counts}, "verification_method_counts": {r[0]: r[1] for r in vm_counts}, "category_counts": {r[0]: r[1] for r in cat_counts}, "evidence_type_counts": {r[0]: r[1] for r in et_counts}, "release_state_counts": {r[0]: r[1] for r in rs_counts}, } @router.get("/controls/atomic-stats") async def atomic_stats(): """Return aggregated statistics for atomic controls (masters only).""" with SessionLocal() as db: total_active = db.execute(text(""" SELECT count(*) FROM canonical_controls WHERE decomposition_method = 'pass0b' AND release_state NOT IN ('duplicate', 'deprecated', 'rejected') """)).scalar() or 0 total_duplicate = db.execute(text(""" SELECT count(*) FROM canonical_controls WHERE decomposition_method = 'pass0b' AND release_state = 'duplicate' """)).scalar() or 0 by_domain = db.execute(text(""" SELECT UPPER(SPLIT_PART(control_id, '-', 1)) AS domain, count(*) AS cnt FROM canonical_controls WHERE decomposition_method = 'pass0b' AND release_state NOT IN ('duplicate', 'deprecated', 'rejected') GROUP BY domain ORDER BY cnt DESC """)).fetchall() by_regulation = db.execute(text(""" SELECT cpl.source_regulation AS regulation, count(DISTINCT cc.id) AS cnt FROM canonical_controls cc JOIN control_parent_links cpl ON cpl.control_uuid = cc.id WHERE cc.decomposition_method = 'pass0b' AND cc.release_state NOT IN ('duplicate', 'deprecated', 'rejected') AND cpl.source_regulation IS NOT NULL GROUP BY cpl.source_regulation ORDER BY cnt DESC """)).fetchall() avg_coverage = db.execute(text(""" SELECT COALESCE(AVG(reg_count), 0) FROM ( SELECT cc.id, count(DISTINCT cpl.source_regulation) AS reg_count FROM canonical_controls cc LEFT JOIN control_parent_links cpl ON cpl.control_uuid = cc.id WHERE cc.decomposition_method = 'pass0b' AND cc.release_state NOT IN ('duplicate', 'deprecated', 'rejected') GROUP BY cc.id ) sub """)).scalar() or 0 return { "total_active": total_active, "total_duplicate": total_duplicate, "by_domain": [{"domain": r[0], "count": r[1]} for r in by_domain], "by_regulation": [{"regulation": r[0], "count": r[1]} for r in by_regulation], "avg_regulation_coverage": round(float(avg_coverage), 1), } @router.get("/controls/v1-enrichment-stats") async def v1_enrichment_stats_endpoint(): """ Uebersicht: Wie viele v1 Controls haben regulatorische Abdeckung? """ from compliance.services.v1_enrichment import get_v1_enrichment_stats return await get_v1_enrichment_stats() @router.get("/controls/{control_id}") async def get_control( control_id: str, service: CanonicalControlService = Depends(get_canonical_service), ) -> dict[str, Any]: """Get a single canonical control by its control_id (e.g. AUTH-001).""" with SessionLocal() as db: row = db.execute( text(f""" SELECT {_CONTROL_COLS} FROM canonical_controls WHERE control_id = :cid """), {"cid": control_id.upper()}, ).fetchone() if not row: raise HTTPException(status_code=404, detail="Control not found") return _control_row(row) @router.get("/controls/{control_id}/traceability") async def get_control_traceability(control_id: str): """Get the full traceability chain for a control. For atomic controls: shows all parent links with source regulations, articles, and the obligation chain. For rich controls: shows child atomic controls derived from them. """ with SessionLocal() as db: # Get control UUID ctrl = db.execute( text(""" SELECT id, control_id, title, parent_control_uuid, decomposition_method, source_citation FROM canonical_controls WHERE control_id = :cid """), {"cid": control_id.upper()}, ).fetchone() if not ctrl: raise HTTPException(status_code=404, detail="Control not found") result: dict[str, Any] = { "control_id": ctrl.control_id, "title": ctrl.title, "is_atomic": ctrl.decomposition_method == "pass0b", } ctrl_uuid = str(ctrl.id) # Parent links (M:N) — for atomic controls parent_links = db.execute( text(""" SELECT cpl.parent_control_uuid, cpl.link_type, cpl.confidence, cpl.source_regulation, cpl.source_article, cpl.obligation_candidate_id, cc.control_id AS parent_control_id, cc.title AS parent_title, cc.source_citation AS parent_citation, oc.obligation_text, oc.action, oc.object, oc.normative_strength FROM control_parent_links cpl JOIN canonical_controls cc ON cc.id = cpl.parent_control_uuid LEFT JOIN obligation_candidates oc ON oc.id = cpl.obligation_candidate_id WHERE cpl.control_uuid = CAST(:uid AS uuid) ORDER BY cpl.source_regulation, cpl.source_article """), {"uid": ctrl_uuid}, ).fetchall() result["parent_links"] = [ { "parent_control_id": pl.parent_control_id, "parent_title": pl.parent_title, "link_type": pl.link_type, "confidence": float(pl.confidence) if pl.confidence else 1.0, "source_regulation": pl.source_regulation, "source_article": pl.source_article, "parent_citation": pl.parent_citation, "obligation": { "text": pl.obligation_text, "action": pl.action, "object": pl.object, "normative_strength": pl.normative_strength, } if pl.obligation_text else None, } for pl in parent_links ] # Also include the 1:1 parent (backwards compat) if not already in links if ctrl.parent_control_uuid: parent_uuids_in_links = { str(pl.parent_control_uuid) for pl in parent_links } parent_uuid_str = str(ctrl.parent_control_uuid) if parent_uuid_str not in parent_uuids_in_links: legacy = db.execute( text(""" SELECT control_id, title, source_citation FROM canonical_controls WHERE id = CAST(:uid AS uuid) """), {"uid": parent_uuid_str}, ).fetchone() if legacy: result["parent_links"].insert(0, { "parent_control_id": legacy.control_id, "parent_title": legacy.title, "link_type": "decomposition", "confidence": 1.0, "source_regulation": None, "source_article": None, "parent_citation": legacy.source_citation, "obligation": None, }) # Child controls — for rich controls children = db.execute( text(""" SELECT control_id, title, category, severity, decomposition_method FROM canonical_controls WHERE parent_control_uuid = CAST(:uid AS uuid) ORDER BY control_id """), {"uid": ctrl_uuid}, ).fetchall() result["children"] = [ { "control_id": ch.control_id, "title": ch.title, "category": ch.category, "severity": ch.severity, "decomposition_method": ch.decomposition_method, } for ch in children ] # Unique source regulations count regs = set() for pl in result["parent_links"]: if pl.get("source_regulation"): regs.add(pl["source_regulation"]) result["source_count"] = len(regs) return result @router.get("/controls/{control_id}/provenance") async def get_control_provenance(control_id: str): """Get full provenance chain for a control — extends traceability with obligations, document references, merged duplicates, and regulations summary. """ with SessionLocal() as db: ctrl = db.execute( text(""" SELECT id, control_id, title, parent_control_uuid, decomposition_method, source_citation FROM canonical_controls WHERE control_id = :cid """), {"cid": control_id.upper()}, ).fetchone() if not ctrl: raise HTTPException(status_code=404, detail="Control not found") ctrl_uuid = str(ctrl.id) is_atomic = ctrl.decomposition_method == "pass0b" result: dict[str, Any] = { "control_id": ctrl.control_id, "title": ctrl.title, "is_atomic": is_atomic, } # --- Parent links (same as traceability) --- parent_links = db.execute( text(""" SELECT cpl.parent_control_uuid, cpl.link_type, cpl.confidence, cpl.source_regulation, cpl.source_article, cpl.obligation_candidate_id, cc.control_id AS parent_control_id, cc.title AS parent_title, cc.source_citation AS parent_citation, oc.obligation_text, oc.action, oc.object, oc.normative_strength FROM control_parent_links cpl JOIN canonical_controls cc ON cc.id = cpl.parent_control_uuid LEFT JOIN obligation_candidates oc ON oc.id = cpl.obligation_candidate_id WHERE cpl.control_uuid = CAST(:uid AS uuid) ORDER BY cpl.source_regulation, cpl.source_article """), {"uid": ctrl_uuid}, ).fetchall() result["parent_links"] = [ { "parent_control_id": pl.parent_control_id, "parent_title": pl.parent_title, "link_type": pl.link_type, "confidence": float(pl.confidence) if pl.confidence else 1.0, "source_regulation": pl.source_regulation, "source_article": pl.source_article, "parent_citation": pl.parent_citation, "obligation": { "text": pl.obligation_text, "action": pl.action, "object": pl.object, "normative_strength": pl.normative_strength, } if pl.obligation_text else None, } for pl in parent_links ] # Legacy 1:1 parent (backwards compat) if ctrl.parent_control_uuid: parent_uuids_in_links = { str(pl.parent_control_uuid) for pl in parent_links } parent_uuid_str = str(ctrl.parent_control_uuid) if parent_uuid_str not in parent_uuids_in_links: legacy = db.execute( text(""" SELECT control_id, title, source_citation FROM canonical_controls WHERE id = CAST(:uid AS uuid) """), {"uid": parent_uuid_str}, ).fetchone() if legacy: result["parent_links"].insert(0, { "parent_control_id": legacy.control_id, "parent_title": legacy.title, "link_type": "decomposition", "confidence": 1.0, "source_regulation": None, "source_article": None, "parent_citation": legacy.source_citation, "obligation": None, }) # --- Children --- children = db.execute( text(""" SELECT control_id, title, category, severity, decomposition_method FROM canonical_controls WHERE parent_control_uuid = CAST(:uid AS uuid) ORDER BY control_id """), {"uid": ctrl_uuid}, ).fetchall() result["children"] = [ { "control_id": ch.control_id, "title": ch.title, "category": ch.category, "severity": ch.severity, "decomposition_method": ch.decomposition_method, } for ch in children ] # Source count regs = set() for pl in result["parent_links"]: if pl.get("source_regulation"): regs.add(pl["source_regulation"]) result["source_count"] = len(regs) # --- Obligations (for Rich Controls) --- obligations = db.execute( text(""" SELECT candidate_id, obligation_text, action, object, normative_strength, release_state FROM obligation_candidates WHERE parent_control_uuid = CAST(:uid AS uuid) AND release_state NOT IN ('rejected', 'merged', 'duplicate') ORDER BY candidate_id """), {"uid": ctrl_uuid}, ).fetchall() result["obligations"] = [ { "candidate_id": ob.candidate_id, "obligation_text": ob.obligation_text, "action": ob.action, "object": ob.object, "normative_strength": ob.normative_strength, "release_state": ob.release_state, } for ob in obligations ] result["obligation_count"] = len(obligations) # --- Document References --- doc_refs = db.execute( text(""" SELECT DISTINCT oe.regulation_code, oe.article, oe.paragraph, oe.extraction_method, oe.confidence FROM obligation_extractions oe WHERE oe.control_uuid = CAST(:uid AS uuid) OR oe.obligation_id IN ( SELECT oc.candidate_id FROM obligation_candidates oc JOIN control_parent_links cpl ON cpl.obligation_candidate_id = oc.id WHERE cpl.control_uuid = CAST(:uid AS uuid) ) ORDER BY oe.regulation_code, oe.article """), {"uid": ctrl_uuid}, ).fetchall() result["document_references"] = [ { "regulation_code": dr.regulation_code, "article": dr.article, "paragraph": dr.paragraph, "extraction_method": dr.extraction_method, "confidence": float(dr.confidence) if dr.confidence else None, } for dr in doc_refs ] # --- Merged Duplicates --- merged = db.execute( text(""" SELECT cc.control_id, cc.title, (SELECT cpl.source_regulation FROM control_parent_links cpl WHERE cpl.control_uuid = cc.id LIMIT 1) AS source_regulation FROM canonical_controls cc WHERE cc.merged_into_uuid = CAST(:uid AS uuid) AND cc.release_state = 'duplicate' ORDER BY cc.control_id """), {"uid": ctrl_uuid}, ).fetchall() result["merged_duplicates"] = [ { "control_id": m.control_id, "title": m.title, "source_regulation": m.source_regulation, } for m in merged ] result["merged_duplicates_count"] = len(merged) # --- Regulations Summary (aggregated from parent_links + doc_refs) --- reg_map: dict[str, dict[str, Any]] = {} for pl in result["parent_links"]: reg = pl.get("source_regulation") if not reg: continue if reg not in reg_map: reg_map[reg] = {"articles": set(), "link_types": set()} if pl.get("source_article"): reg_map[reg]["articles"].add(pl["source_article"]) reg_map[reg]["link_types"].add(pl.get("link_type", "decomposition")) for dr in result["document_references"]: reg = dr.get("regulation_code") if not reg: continue if reg not in reg_map: reg_map[reg] = {"articles": set(), "link_types": set()} if dr.get("article"): reg_map[reg]["articles"].add(dr["article"]) result["regulations_summary"] = [ { "regulation_code": reg, "articles": sorted(info["articles"]), "link_types": sorted(info["link_types"]), } for reg, info in sorted(reg_map.items()) ] return result # ============================================================================= # NORMATIVE STRENGTH BACKFILL # ============================================================================= @router.post("/controls/backfill-normative-strength") async def backfill_normative_strength( dry_run: bool = Query(True, description="Nur zaehlen, nicht aendern"), ): """ Korrigiert normative_strength auf obligation_candidates basierend auf dem source_type der Quell-Regulierung. Dreistufiges Modell: - law (Gesetz): normative_strength bleibt unveraendert - guideline (Leitlinie): max 'should' - framework (Framework): max 'can' Fuer Controls mit mehreren Parent-Links gilt der hoechste source_type. """ from compliance.data.source_type_classification import ( classify_source_regulation, cap_normative_strength, ) with SessionLocal() as db: # 1. Alle Obligations mit source_citation des Parent Controls laden obligations = db.execute(text(""" SELECT oc.id, oc.candidate_id, oc.normative_strength, cc.source_citation->>'source' AS parent_source FROM obligation_candidates oc JOIN canonical_controls cc ON cc.id = oc.parent_control_uuid WHERE oc.release_state NOT IN ('rejected', 'merged', 'duplicate') AND oc.normative_strength IS NOT NULL ORDER BY oc.candidate_id """)).fetchall() # 2. Normative strength korrigieren basierend auf source_type changes = [] stats = {"total": len(obligations), "unchanged": 0, "capped_to_should": 0, "capped_to_may": 0, "no_source": 0} for obl in obligations: if not obl.parent_source: stats["no_source"] += 1 continue source_type = classify_source_regulation(obl.parent_source) new_strength = cap_normative_strength(obl.normative_strength, source_type) if new_strength != obl.normative_strength: changes.append({ "id": str(obl.id), "candidate_id": obl.candidate_id, "old_strength": obl.normative_strength, "new_strength": new_strength, "source_type": source_type, "source_regulation": obl.parent_source, }) if new_strength == "should": stats["capped_to_should"] += 1 elif new_strength == "may": stats["capped_to_may"] += 1 else: stats["unchanged"] += 1 # 4. Aenderungen anwenden (wenn kein dry_run) if not dry_run and changes: for change in changes: db.execute(text(""" UPDATE obligation_candidates SET normative_strength = :new_strength WHERE id = CAST(:oid AS uuid) """), {"new_strength": change["new_strength"], "oid": change["id"]}) db.commit() return { "dry_run": dry_run, "stats": stats, "total_changes": len(changes), "sample_changes": changes[:20], } # ============================================================================= # OBLIGATION DEDUPLICATION # ============================================================================= @router.post("/obligations/dedup") async def dedup_obligations( dry_run: bool = Query(True, description="Nur zaehlen, nicht aendern"), batch_size: int = Query(0, description="0 = alle auf einmal"), offset: int = Query(0, description="Offset fuer Batch-Verarbeitung"), ): """ Markiert doppelte obligation_candidates als 'duplicate'. Duplikate = mehrere Eintraege mit gleichem candidate_id. Pro candidate_id wird der aelteste Eintrag (MIN(created_at)) behalten, alle anderen erhalten release_state='duplicate' und merged_into_id zeigt auf den behaltenen Eintrag. """ with SessionLocal() as db: # 1. Finde alle candidate_ids mit mehr als einem Eintrag # (nur noch nicht-deduplizierte beruecksichtigen) dup_query = """ SELECT candidate_id, count(*) as cnt FROM obligation_candidates WHERE release_state NOT IN ('rejected', 'merged', 'duplicate') GROUP BY candidate_id HAVING count(*) > 1 ORDER BY candidate_id """ if batch_size > 0: dup_query += f" LIMIT {batch_size} OFFSET {offset}" dup_groups = db.execute(text(dup_query)).fetchall() total_groups = db.execute(text(""" SELECT count(*) FROM ( SELECT candidate_id FROM obligation_candidates WHERE release_state NOT IN ('rejected', 'merged', 'duplicate') GROUP BY candidate_id HAVING count(*) > 1 ) sub """)).scalar() # 2. Pro Gruppe: aeltesten behalten, Rest als duplicate markieren kept_count = 0 duplicate_count = 0 sample_changes: list[dict[str, Any]] = [] for grp in dup_groups: cid = grp.candidate_id # Alle Eintraege fuer dieses candidate_id holen entries = db.execute(text(""" SELECT id, candidate_id, obligation_text, release_state, created_at FROM obligation_candidates WHERE candidate_id = :cid AND release_state NOT IN ('rejected', 'merged', 'duplicate') ORDER BY created_at ASC, id ASC """), {"cid": cid}).fetchall() if len(entries) < 2: continue keeper = entries[0] # aeltester Eintrag duplicates = entries[1:] kept_count += 1 duplicate_count += len(duplicates) if len(sample_changes) < 20: sample_changes.append({ "candidate_id": cid, "kept_id": str(keeper.id), "kept_text": keeper.obligation_text[:100], "duplicate_count": len(duplicates), "duplicate_ids": [str(d.id) for d in duplicates], }) if not dry_run: for dup in duplicates: db.execute(text(""" UPDATE obligation_candidates SET release_state = 'duplicate', merged_into_id = CAST(:keeper_id AS uuid), quality_flags = COALESCE(quality_flags, '{}'::jsonb) || jsonb_build_object( 'dedup_reason', 'duplicate of ' || :keeper_cid, 'dedup_kept_id', :keeper_id_str, 'dedup_at', NOW()::text ) WHERE id = CAST(:dup_id AS uuid) """), { "keeper_id": str(keeper.id), "keeper_cid": cid, "keeper_id_str": str(keeper.id), "dup_id": str(dup.id), }) if not dry_run and duplicate_count > 0: db.commit() return { "dry_run": dry_run, "stats": { "total_duplicate_groups": total_groups, "processed_groups": len(dup_groups), "kept": kept_count, "marked_duplicate": duplicate_count, }, "sample_changes": sample_changes, } @router.get("/obligations/dedup-stats") async def dedup_obligations_stats(): """Statistiken ueber den aktuellen Dedup-Status der Obligations.""" with SessionLocal() as db: total = db.execute(text( "SELECT count(*) FROM obligation_candidates" )).scalar() by_state = db.execute(text(""" SELECT release_state, count(*) as cnt FROM obligation_candidates GROUP BY release_state ORDER BY release_state """)).fetchall() dup_groups = db.execute(text(""" SELECT count(*) FROM ( SELECT candidate_id FROM obligation_candidates WHERE release_state NOT IN ('rejected', 'merged', 'duplicate') GROUP BY candidate_id HAVING count(*) > 1 ) sub """)).scalar() removable = db.execute(text(""" SELECT COALESCE(sum(cnt - 1), 0) FROM ( SELECT candidate_id, count(*) as cnt FROM obligation_candidates WHERE release_state NOT IN ('rejected', 'merged', 'duplicate') GROUP BY candidate_id HAVING count(*) > 1 ) sub """)).scalar() return { "total_obligations": total, "by_state": {r.release_state: r.cnt for r in by_state}, "pending_duplicate_groups": dup_groups, "pending_removable_duplicates": removable, } # ============================================================================= # EVIDENCE TYPE BACKFILL # ============================================================================= # Domains that are primarily technical (code-verifiable) _CODE_DOMAINS = frozenset({ "SEC", "AUTH", "CRYPT", "CRYP", "CRY", "NET", "LOG", "ACC", "APP", "SYS", "CI", "CONT", "API", "CLOUD", "IAC", "SAST", "DAST", "DEP", "SBOM", "WEB", "DEV", "SDL", "PKI", "HSM", "TEE", "TPM", "CRX", "CRF", "FWU", "STO", "RUN", "VUL", "MAL", "PLT", "AUT", }) # Domains that are primarily process-based (document-verifiable) _PROCESS_DOMAINS = frozenset({ "GOV", "ORG", "COMP", "LEGAL", "HR", "TRAIN", "AML", "FIN", "RISK", "AUDIT", "AUD", "PROC", "DOC", "PHYS", "PHY", "PRIV", "DPO", "BCDR", "BCP", "VENDOR", "SUPPLY", "SUP", "CERT", "POLICY", "ENV", "HLT", "TRD", "LAB", "PER", "REL", "ISM", "COM", "GAM", "RIS", "PCA", "GNT", "HCA", "RES", "ISS", }) # Domains that are typically hybrid _HYBRID_DOMAINS = frozenset({ "DATA", "AI", "INC", "ID", "IAM", "IDF", "IDP", "IDA", "IDN", "OPS", "MNT", "INT", "BCK", }) def _classify_evidence_type(control_id: str, category: str | None) -> str: """Heuristic: classify a control as code/process/hybrid based on domain prefix.""" domain = control_id.split("-")[0].upper() if control_id else "" if domain in _CODE_DOMAINS: return "code" if domain in _PROCESS_DOMAINS: return "process" if domain in _HYBRID_DOMAINS: return "hybrid" # Fallback: use category if available code_categories = {"encryption", "authentication", "network", "application", "system", "identity"} process_categories = {"compliance", "personnel", "physical", "governance", "risk"} if category in code_categories: return "code" if category in process_categories: return "process" return "process" # Conservative default @router.post("/controls/backfill-evidence-type") async def backfill_evidence_type( dry_run: bool = Query(True, description="Nur zaehlen, nicht aendern"), ): """ Klassifiziert Controls als code/process/hybrid basierend auf Domain-Prefix. Heuristik: - SEC, AUTH, CRYPT, NET, LOG, ... → code - GOV, ORG, COMP, LEGAL, HR, ... → process - DATA, AI, INC → hybrid """ with SessionLocal() as db: rows = db.execute(text(""" SELECT id, control_id, category, evidence_type FROM canonical_controls WHERE release_state NOT IN ('rejected', 'merged') ORDER BY control_id """)).fetchall() changes = [] stats = {"total": len(rows), "already_set": 0, "code": 0, "process": 0, "hybrid": 0} for row in rows: if row.evidence_type is not None: stats["already_set"] += 1 continue new_type = _classify_evidence_type(row.control_id, row.category) stats[new_type] += 1 changes.append({ "id": str(row.id), "control_id": row.control_id, "evidence_type": new_type, }) if not dry_run and changes: for change in changes: db.execute(text(""" UPDATE canonical_controls SET evidence_type = :et WHERE id = CAST(:cid AS uuid) """), {"et": change["evidence_type"], "cid": change["id"]}) db.commit() return { "dry_run": dry_run, "stats": stats, "total_changes": len(changes), "sample_changes": changes[:20], } # ============================================================================= # RATIONALE BACKFILL (LLM) # ============================================================================= @router.post("/controls/backfill-rationale") async def backfill_rationale( dry_run: bool = Query(True, description="Nur zaehlen, nicht aendern"), batch_size: int = Query(50, description="Parent-Controls pro Durchlauf"), offset: int = Query(0, description="Offset fuer Paginierung (Parent-Index)"), ): """ Generiert sinnvolle Begruendungen fuer atomare Controls per LLM. Optimierung: Gruppiert nach Parent-Control (~7k Parents statt ~86k Einzel-Calls). Pro Parent-Gruppe wird EIN LLM-Aufruf gemacht, der eine gemeinsame Begruendung fuer alle Kinder erzeugt. Workflow: 1. dry_run=true → Statistiken anzeigen 2. dry_run=false&batch_size=50&offset=0 → Erste 50 Parents verarbeiten 3. Wiederholen mit offset=50, 100, ... bis fertig """ from compliance.services.llm_provider import get_llm_provider with SessionLocal() as db: # 1. Parent-Controls mit Kindern laden (nur wo rationale = Placeholder) parents = db.execute(text(""" SELECT p.id AS parent_uuid, p.control_id, p.title, p.category, p.source_citation->>'source' AS source_name, COUNT(c.id) AS child_count FROM canonical_controls p JOIN canonical_controls c ON c.parent_control_uuid = p.id WHERE c.rationale = 'Aus Obligation abgeleitet.' AND c.release_state NOT IN ('rejected', 'merged') GROUP BY p.id, p.control_id, p.title, p.category, p.source_citation->>'source' ORDER BY p.control_id """)).fetchall() total_parents = len(parents) total_children = sum(p.child_count for p in parents) if dry_run: return { "dry_run": True, "total_parents": total_parents, "total_children": total_children, "estimated_llm_calls": total_parents, "sample_parents": [ { "control_id": p.control_id, "title": p.title, "source": p.source_name, "child_count": p.child_count, } for p in parents[:10] ], } # 2. Batch auswählen batch = parents[offset : offset + batch_size] if not batch: return { "dry_run": False, "message": "Kein weiterer Batch — alle Parents verarbeitet.", "total_parents": total_parents, "offset": offset, "processed": 0, } provider = get_llm_provider() processed = 0 children_updated = 0 errors = [] sample_rationales = [] for parent in batch: parent_uuid = str(parent.parent_uuid) source = parent.source_name or "Regulierung" # LLM-Prompt prompt = ( f"Du bist Compliance-Experte. Erklaere in 1-2 Saetzen auf Deutsch, " f"WARUM aus dem uebergeordneten Control atomare Teilmassnahmen " f"abgeleitet wurden.\n\n" f"Uebergeordnetes Control: {parent.control_id} — {parent.title}\n" f"Regulierung: {source}\n" f"Kategorie: {parent.category or 'k.A.'}\n" f"Anzahl atomarer Controls: {parent.child_count}\n\n" f"Schreibe NUR die Begruendung (1-2 Saetze). Kein Markdown, " f"keine Aufzaehlung, kein Praefix. " f"Erklaere den regulatorischen Hintergrund und warum die " f"Zerlegung in atomare, testbare Massnahmen notwendig ist." ) try: response = await provider.complete( prompt=prompt, max_tokens=256, temperature=0.3, ) rationale = response.content.strip() # Bereinigen: Anfuehrungszeichen, Markdown entfernen rationale = rationale.strip('"').strip("'").strip() if rationale.startswith("Begründung:") or rationale.startswith("Begruendung:"): rationale = rationale.split(":", 1)[1].strip() # Laenge begrenzen (max 500 Zeichen) if len(rationale) > 500: rationale = rationale[:497] + "..." if not rationale or len(rationale) < 10: errors.append({ "control_id": parent.control_id, "error": "LLM-Antwort zu kurz oder leer", }) continue # Alle Kinder dieses Parents updaten result = db.execute( text(""" UPDATE canonical_controls SET rationale = :rationale WHERE parent_control_uuid = CAST(:pid AS uuid) AND rationale = 'Aus Obligation abgeleitet.' AND release_state NOT IN ('rejected', 'merged') """), {"rationale": rationale, "pid": parent_uuid}, ) children_updated += result.rowcount processed += 1 if len(sample_rationales) < 5: sample_rationales.append({ "parent": parent.control_id, "title": parent.title, "rationale": rationale, "children_updated": result.rowcount, }) except Exception as e: logger.error(f"LLM error for {parent.control_id}: {e}") errors.append({ "control_id": parent.control_id, "error": str(e)[:200], }) # Rollback um DB-Session nach Fehler nutzbar zu halten try: db.rollback() except Exception: pass db.commit() return { "dry_run": False, "offset": offset, "batch_size": batch_size, "next_offset": offset + batch_size if offset + batch_size < total_parents else None, "processed_parents": processed, "children_updated": children_updated, "total_parents": total_parents, "total_children": total_children, "errors": errors[:10], "sample_rationales": sample_rationales, } # ============================================================================= # CONTROL CRUD (CREATE / UPDATE / DELETE) # ============================================================================= @router.post("/controls", status_code=201) async def create_control( body: ControlCreateRequest, service: CanonicalControlService = Depends(get_canonical_service), ) -> dict[str, Any]: """Create a new canonical control.""" import json as _json import re # Validate control_id format if not re.match(r"^[A-Z]{2,6}-[0-9]{3}$", body.control_id): raise HTTPException(status_code=400, detail="control_id must match DOMAIN-NNN (e.g. AUTH-001)") if body.severity not in ("low", "medium", "high", "critical"): raise HTTPException(status_code=400, detail="severity must be low/medium/high/critical") if body.risk_score is not None and not (0 <= body.risk_score <= 10): raise HTTPException(status_code=400, detail="risk_score must be 0..10") with SessionLocal() as db: # Resolve framework fw = db.execute( text("SELECT id FROM canonical_control_frameworks WHERE framework_id = :fid"), {"fid": body.framework_id}, ).fetchone() if not fw: raise HTTPException(status_code=404, detail=f"Framework '{body.framework_id}' not found") # Check duplicate existing = db.execute( text("SELECT id FROM canonical_controls WHERE framework_id = :fid AND control_id = :cid"), {"fid": str(fw.id), "cid": body.control_id}, ).fetchone() if existing: raise HTTPException(status_code=409, detail=f"Control '{body.control_id}' already exists") row = db.execute( text(f""" INSERT INTO canonical_controls ( framework_id, control_id, title, objective, rationale, scope, requirements, test_procedure, evidence, severity, risk_score, implementation_effort, evidence_confidence, open_anchors, release_state, tags, license_rule, source_original_text, source_citation, customer_visible, verification_method, category, evidence_type, target_audience, generation_metadata, applicable_industries, applicable_company_size, scope_conditions ) VALUES ( :fw_id, :cid, :title, :objective, :rationale, CAST(:scope AS jsonb), CAST(:requirements AS jsonb), CAST(:test_procedure AS jsonb), CAST(:evidence AS jsonb), :severity, :risk_score, :effort, :confidence, CAST(:anchors AS jsonb), :release_state, CAST(:tags AS jsonb), :license_rule, :source_original_text, CAST(:source_citation AS jsonb), :customer_visible, :verification_method, :category, :evidence_type, :target_audience, CAST(:generation_metadata AS jsonb), CAST(:applicable_industries AS jsonb), CAST(:applicable_company_size AS jsonb), CAST(:scope_conditions AS jsonb) ) RETURNING {_CONTROL_COLS} """), { "fw_id": str(fw.id), "cid": body.control_id, "title": body.title, "objective": body.objective, "rationale": body.rationale, "scope": _json.dumps(body.scope), "requirements": _json.dumps(body.requirements), "test_procedure": _json.dumps(body.test_procedure), "evidence": _json.dumps(body.evidence), "severity": body.severity, "risk_score": body.risk_score, "effort": body.implementation_effort, "confidence": body.evidence_confidence, "anchors": _json.dumps(body.open_anchors), "release_state": body.release_state, "tags": _json.dumps(body.tags), "license_rule": body.license_rule, "source_original_text": body.source_original_text, "source_citation": _json.dumps(body.source_citation) if body.source_citation else None, "customer_visible": body.customer_visible, "verification_method": body.verification_method, "category": body.category, "evidence_type": body.evidence_type, "target_audience": body.target_audience, "generation_metadata": _json.dumps(body.generation_metadata) if body.generation_metadata else None, "applicable_industries": _json.dumps(body.applicable_industries) if body.applicable_industries else None, "applicable_company_size": _json.dumps(body.applicable_company_size) if body.applicable_company_size else None, "scope_conditions": _json.dumps(body.scope_conditions) if body.scope_conditions else None, }, ).fetchone() db.commit() return _control_row(row) @router.put("/controls/{control_id}") async def update_control( control_id: str, body: ControlUpdateRequest, service: CanonicalControlService = Depends(get_canonical_service), ) -> dict[str, Any]: """Update an existing canonical control (partial update).""" import json as _json updates = body.dict(exclude_none=True) if not updates: raise HTTPException(status_code=400, detail="No fields to update") if "severity" in updates and updates["severity"] not in ("low", "medium", "high", "critical"): raise HTTPException(status_code=400, detail="severity must be low/medium/high/critical") if "risk_score" in updates and updates["risk_score"] is not None and not (0 <= updates["risk_score"] <= 10): raise HTTPException(status_code=400, detail="risk_score must be 0..10") # Build dynamic SET clause set_parts = [] params: dict[str, Any] = {"cid": control_id.upper()} json_fields = {"scope", "requirements", "test_procedure", "evidence", "open_anchors", "tags", "source_citation", "generation_metadata"} for key, val in updates.items(): col = key if key in json_fields: set_parts.append(f"{col} = CAST(:{key} AS jsonb)") params[key] = _json.dumps(val) else: set_parts.append(f"{col} = :{key}") params[key] = val set_parts.append("updated_at = NOW()") with SessionLocal() as db: row = db.execute( text(f""" UPDATE canonical_controls SET {', '.join(set_parts)} WHERE control_id = :cid RETURNING {_CONTROL_COLS} """), params, ).fetchone() if not row: raise HTTPException(status_code=404, detail="Control not found") db.commit() return _control_row(row) @router.delete("/controls/{control_id}", status_code=204) async def delete_control( control_id: str, service: CanonicalControlService = Depends(get_canonical_service), ) -> None: """Delete a canonical control.""" with translate_domain_errors(): service.delete_control(control_id) # ============================================================================= # SIMILARITY CHECK # ============================================================================= @router.post("/controls/{control_id}/similarity-check") async def similarity_check( control_id: str, body: SimilarityCheckRequest, service: CanonicalControlService = Depends(get_canonical_service), ) -> dict[str, Any]: """Run the too-close detector against a source/candidate text pair.""" with translate_domain_errors(): return await service.similarity_check(control_id, body) # ============================================================================= # CATEGORIES # ============================================================================= @router.get("/categories") async def list_categories(): """List all canonical control categories.""" with SessionLocal() as db: rows = db.execute( text("SELECT category_id, label_de, label_en, sort_order FROM canonical_control_categories ORDER BY sort_order") ).fetchall() return [ { "category_id": r.category_id, "label_de": r.label_de, "label_en": r.label_en, "sort_order": r.sort_order, } for r in rows ] # ============================================================================= # SIMILAR CONTROLS (Embedding-based dedup) # ============================================================================= @router.get("/controls/{control_id}/similar") async def find_similar_controls( control_id: str, threshold: float = Query(0.85, ge=0.5, le=1.0), limit: int = Query(20, ge=1, le=100), ): """Find controls similar to the given one using embedding cosine similarity.""" with SessionLocal() as db: # Get the target control's embedding target = db.execute( text(""" SELECT id, control_id, title, objective FROM canonical_controls WHERE control_id = :cid """), {"cid": control_id.upper()}, ).fetchone() if not target: raise HTTPException(status_code=404, detail="Control not found") # Find similar controls using pg_vector cosine distance if available, # otherwise fall back to text-based matching via objective similarity try: rows = db.execute( text(""" SELECT c.control_id, c.title, c.severity, c.release_state, c.tags, c.license_rule, c.verification_method, c.category, 1 - (c.embedding <=> t.embedding) AS similarity FROM canonical_controls c, canonical_controls t WHERE t.control_id = :cid AND c.control_id != :cid AND c.release_state != 'deprecated' AND c.embedding IS NOT NULL AND t.embedding IS NOT NULL AND 1 - (c.embedding <=> t.embedding) >= :threshold ORDER BY similarity DESC LIMIT :lim """), {"cid": control_id.upper(), "threshold": threshold, "lim": limit}, ).fetchall() return [ { "control_id": r.control_id, "title": r.title, "severity": r.severity, "release_state": r.release_state, "tags": r.tags or [], "license_rule": r.license_rule, "verification_method": r.verification_method, "category": r.category, "similarity": round(float(r.similarity), 4), } for r in rows ] except Exception as e: logger.warning("Embedding similarity query failed (no embedding column?): %s", e) return [] # ============================================================================= # SOURCES & LICENSES # ============================================================================= @router.get("/sources") async def list_sources( service: CanonicalControlService = Depends(get_canonical_service), ) -> Any: """List all registered sources with permission flags.""" with translate_domain_errors(): return service.list_sources() @router.get("/licenses") async def list_licenses( service: CanonicalControlService = Depends(get_canonical_service), ) -> Any: """Return the license matrix.""" with translate_domain_errors(): return service.list_licenses() # ============================================================================= # V1 ENRICHMENT (Eigenentwicklung → Regulatorische Abdeckung) # ============================================================================= @router.post("/controls/enrich-v1-matches") async def enrich_v1_matches_endpoint( dry_run: bool = Query(True, description="Nur zaehlen, nicht schreiben"), batch_size: int = Query(100, description="Controls pro Durchlauf"), offset: int = Query(0, description="Offset fuer Paginierung"), ): """ Findet regulatorische Abdeckung fuer v1 Eigenentwicklung Controls. Eigenentwicklung = generation_strategy='ungrouped', pipeline_version=1, source_citation IS NULL, parent_control_uuid IS NULL. Workflow: 1. dry_run=true → Statistiken anzeigen 2. dry_run=false&batch_size=100&offset=0 → Erste 100 verarbeiten 3. Wiederholen mit next_offset bis fertig """ from compliance.services.v1_enrichment import enrich_v1_matches return await enrich_v1_matches( dry_run=dry_run, batch_size=batch_size, offset=offset, ) @router.get("/controls/{control_id}/v1-matches") async def get_v1_matches_endpoint(control_id: str): """ Gibt regulatorische Matches fuer ein v1 Control zurueck. Returns: Liste von Matches mit Control-Details, Source, Score. """ from compliance.services.v1_enrichment import get_v1_matches # Resolve control_id to UUID with SessionLocal() as db: row = db.execute(text(""" SELECT id FROM canonical_controls WHERE control_id = :cid """), {"cid": control_id}).fetchone() if not row: raise HTTPException(status_code=404, detail=f"Control {control_id} not found") return await get_v1_matches(str(row.id)) # ============================================================================= # INTERNAL HELPERS # ============================================================================= def _control_row(r) -> dict: return { "id": str(r.id), "framework_id": str(r.framework_id), "control_id": r.control_id, "title": r.title, "objective": r.objective, "rationale": r.rationale, "scope": r.scope, "requirements": r.requirements, "test_procedure": r.test_procedure, "evidence": r.evidence, "severity": r.severity, "risk_score": float(r.risk_score) if r.risk_score is not None else None, "implementation_effort": r.implementation_effort, "evidence_confidence": float(r.evidence_confidence) if r.evidence_confidence is not None else None, "open_anchors": r.open_anchors, "release_state": r.release_state, "tags": r.tags or [], "license_rule": r.license_rule, "source_original_text": r.source_original_text, "source_citation": r.source_citation, "customer_visible": r.customer_visible, "verification_method": r.verification_method, "category": r.category, "evidence_type": getattr(r, "evidence_type", None), "target_audience": r.target_audience, "generation_metadata": r.generation_metadata, "generation_strategy": getattr(r, "generation_strategy", "ungrouped"), "applicable_industries": getattr(r, "applicable_industries", None), "applicable_company_size": getattr(r, "applicable_company_size", None), "scope_conditions": getattr(r, "scope_conditions", None), "parent_control_uuid": str(r.parent_control_uuid) if getattr(r, "parent_control_uuid", None) else None, "parent_control_id": getattr(r, "parent_control_id", None), "parent_control_title": getattr(r, "parent_control_title", None), "decomposition_method": getattr(r, "decomposition_method", None), "pipeline_version": getattr(r, "pipeline_version", None), "created_at": r.created_at.isoformat() if r.created_at else None, "updated_at": r.updated_at.isoformat() if r.updated_at else None, }