15 Commits

Author SHA1 Message Date
Benjamin Admin e013702a02 Merge branch 'main' of ssh://gitea.meghsakha.com:22222/Benjamin_Boenisch/breakpilot-core
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-consent (push) Successful in 47s
CI / test-python-voice (push) Successful in 38s
CI / test-bqas (push) Successful in 37s
2026-05-06 21:06:19 +02:00
Benjamin Admin f022b489e2 docs: comprehensive session handover — Blocks F+G complete, next: MC quality refinement
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-06 21:06:01 +02:00
Benjamin Admin 0092c4fe47 feat(pipeline): G-pre1 refinement script for large object groups
Splits master controls >200 members by re-clustering their object groups
with k=4-20 per group. First round: 38 groups → 325 sub-groups → 253 new MCs.
25 generic MCs remain (monitoring, procedure, etc.) — need regulation-source split.

Session summary: Block F complete, Control Generation (1,599+), Pass 0a/0b,
Production Sync, G-pre1/2/3 Object Clustering + Master Controls + API,
G1-G4 Compliance Execution Layer (Decision Trace, Commit Ledger, Decision Memory,
Pre-Deployment Enforcement).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-06 20:41:49 +02:00
Benjamin Admin d5bcd0bd5b feat(pipeline): G4 Pre-Deployment Enforcement — CI/CD compliance gate
New table: deployment_checks (verdict, blocking/warning controls, risk score)
New API:
  POST /v1/deployment-checks (SDK asks: "can I deploy?")
  GET /v1/deployment-checks/{id} (check result)
  POST /v1/deployment-checks/{id}/override (manual override with justification)
  GET /v1/deployment-checks/stats (approval/block rate)

Check logic: queries G1 decision_traces + G3 open failures per affected control.
Verdict: approved (0 blocking) or blocked (with fix recommendations).
454 tests pass, 0 regressions.

Block G complete: G1-G4 all implemented.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-06 20:24:45 +02:00
Benjamin Admin c398e74d5e feat(pipeline): G3 Full Decision Memory — compliance lifecycle event stream
New table: decision_events (assessment→decision→fix→verification→failure cycle)
New API:
  POST /v1/decision-events (record lifecycle event)
  GET /v1/decision-events (list with filters)
  GET /v1/decision-events/timeline/{control_id} (full chronological timeline)
  GET /v1/decision-events/stats (failure rate, cycle times)

Each event captures input_state, output_state, actor, evidence.
454 tests pass, 0 regressions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-06 20:16:25 +02:00
Benjamin Admin e82f99b8cb feat(pipeline): G2 Compliance Commit Ledger — code↔control audit trail
New table: compliance_commits (commit hash, affected controls, risk level)
New API:
  POST /v1/compliance-commits (SDK registers commit + impact)
  GET /v1/compliance-commits (list with filters)
  GET /v1/compliance-commits/by-control/{id} (all commits for a control)
  GET /v1/compliance-commits/stats (dashboard)
  GET /v1/compliance-commits/{id} (detail)

GIN index on affected_control_ids for fast @> containment queries.
454 tests pass, 0 regressions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-06 19:17:45 +02:00
Benjamin Admin 66a70ab31c feat(pipeline): G1 Decision Trace — compliance decision tracking
New table: decision_traces (status, reason, evidence, fix plan per control)
New API:
  POST/GET/PUT /v1/decision-traces (CRUD for decisions)
  GET /v1/decision-traces/stats (compliance dashboard)
  GET /v1/controls/{id}/full-trace (Regulation→Obligation→Control→Decision→Evidence)

454 tests pass, 0 regressions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-06 18:26:21 +02:00
Benjamin Admin ad24835940 feat(pipeline): G-pre1/2/3 — Object Clustering + Master Controls + API
G-pre1: 144k objects clustered into 7,466 groups via Mini-Batch K-Means
  on bge-m3 embeddings. Two-stage: k=5000 base + sub-cluster groups >50.
G-pre2: 5,114 Master Controls from lifecycle phase chains
  (define→implement→test→monitor), linking 172,504 atomic controls.
G-pre3: REST API for Master Controls
  GET /v1/master-controls (list, search, filter)
  GET /v1/master-controls/stats
  GET /v1/master-controls/{mc_id} (detail with phase-controls)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-06 15:11:38 +02:00
Benjamin Admin e683701a44 fix(gitea): remove /etc/timezone mount (macOS incompatible), use TZ env var
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-05 19:37:43 +02:00
Benjamin Admin 0bad74a3bd docs: session handover — Block F complete, pipeline done, G-pre1 analysis
Session 03-05.05.2026:
- Block F1-F5 complete (DB migration of hardcoded dicts)
- Control Generation: 1,599 controls + 11,522 obligations + 1,147 atomics
- Production sync: 2,625 controls + 11,522 obligations synced
- G-pre1 analysis: 183k objects → 144k after normalize (needs hierarchical clustering)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-05 18:02:10 +02:00
Benjamin Admin 22257a7ed8 feat(pipeline): F5 validation tests — verify DB matches hardcoded dicts
8 tests confirm all REGULATION_LICENSE_MAP, ACTION_TYPES, _NEGATIVE_PATTERNS,
_ACTION_SYNONYMS, and _OBJECT_SYNONYMS entries are correctly migrated to DB.
Dicts kept as fallback for DB-unavailability resilience.

Block F complete: F1-F5 all done.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-05 16:06:59 +02:00
Benjamin Admin a20de0b52b feat(pipeline): F4 LLM synonym enrichment script
Uses Ollama (qwen3.5:35b-a3b, think:false) to generate additional
German synonyms for action types and object tokens. Results stored
with source='llm' in action_synonyms/object_synonyms tables.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-05 15:45:43 +02:00
Benjamin Admin 775d8b52f3 fix(vault): prevent CPU-burning init loop with marker file + idempotent checks
Root cause: init scripts ran repeatedly (on container restart) and tried
vault secrets enable / vault auth enable for already-existing paths.
Vault logged ERRORs and burned 40-84% CPU in the loop.

Fix:
- Marker file /vault/data/.init-complete skips re-initialization
- vault secrets list / vault auth list checks before enable calls
- No more "path already in use" errors on subsequent runs

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-05 11:46:16 +02:00
Benjamin Admin 64f45be63a feat(pipeline): add Pass 0a endpoint to core control-pipeline
Registers /generate/run-pass0a and /generate/pass0a-status/{job_id}
on the core control-pipeline (port 8098). Previously Pass 0a was only
available on the compliance backend which connects to Production DB,
causing a split-brain when controls are generated locally.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-05 07:21:58 +02:00
Benjamin Admin e869cabc81 docs: session handover — F1-F3 done, control generation running
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-04 07:21:24 +02:00
25 changed files with 3077 additions and 85 deletions
+157 -78
View File
@@ -1,115 +1,194 @@
# Session-Instruktionen: Block F — Hardcoded Knowledge Migration
# Session-Instruktionen: Master Control Qualitaet + Regulation-Source Split
**Datum:** 2026-05-03
**Datum:** 2026-05-06
**Fuer:** Naechste Claude-Session
**Repo:** breakpilot-core (~/Projekte/breakpilot-core)
---
## NAECHSTER SCHRITT: Block F1 — Regulation Registry
## NAECHSTER SCHRITT: 25 grosse Master Controls aufsplitten
### Was zu tun ist
### Problem
1. **DB-Tabelle** `compliance.regulation_registry` erstellen (Migration-Script)
2. **Daten migrieren** aus `control_generator.py` (135 Eintraege) + `source_type_classification.py` (58)
3. **Auto-Create** im RAG-Service bei Document-Upload (status='needs_review')
4. **Backend-API** in breakpilot-compliance Backend (GET/POST/PUT /v1/regulations)
5. **Frontend** in breakpilot-compliance Admin unter `/sdk/regulation-registry` (zwischen roadmap und isms)
6. **Sync-Check** Script (wöchentlich: Qdrant regulation_ids vs. DB)
7. **Code umstellen** in control_generator.py (Dict → DB-Query mit Cache)
25 Master Controls sind zu generisch (>200 Atomic Controls pro MC). Sie basieren auf generischen Security-Domain-Keywords wie "monitoring", "encryption", "personal_data". Embedding-Clustering allein reicht nicht — die Controls handeln zwar alle von "monitoring", aber fuer unterschiedliche Regulierungen (DSGVO, NIS2, NIST, BSI etc.).
### Frontend-Anforderungen (breakpilot-compliance Admin, Port 3007)
### Die 25 betroffenen MCs
- NAV-Position: zwischen `/sdk/roadmap` und `/sdk/isms`
- Tabelle mit allen Regulations (sortierbar, filterbar)
- Status-Badge: "Needs Review" (gelb), "Active" (grün), "Deprecated" (grau)
- Counter im NAV für unreviewed Einträge
- Inline-Edit: license_rule, jurisdiction, source_type, names
- "Approve" Button → status='active'
- Diskrepanz-Anzeige: regulation_ids in Qdrant die nicht in DB sind
| MC-ID | Name | Controls | Problem |
|-------|------|----------|---------|
| MC-8292 | monitoring | 6.157 | Alles von Video bis Vulnerability |
| MC-2260 | procedure | 4.176 | Generisch |
| MC-8302 | alerting | 3.126 | Meldepflichten aller Gesetze gemischt |
| MC-8306 | personal_data | 3.057 | DSGVO + NIS2 + AT/CH gemischt |
| MC-8312 | training | 2.572 | |
| MC-7932 | certificate_management | 2.350 | |
| MC-8317 | incident | 2.288 | |
| MC-8329 | encryption | 1.790 | |
| MC-8333 | audit_logging | 1.645 | |
| MC-8321 | policy | 1.463 | |
| MC-8325 | patch_management | 1.155 | |
| MC-8338 | network_security | 1.071 | |
| ... | (13 weitere) | 200-960 | |
### Kritische Dateien
### Loesung: Regulation-Source Split
| Repo | Datei | Aktion |
|------|-------|--------|
| core | `control-pipeline/services/control_generator.py` Z.75-236 | EDIT: Dict → DB |
| core | `control-pipeline/data/source_type_classification.py` | DELETE (nach Migration) |
| core | `rag-service/api/documents.py` | EDIT: Auto-Create bei Upload |
| compliance | `backend-compliance/compliance/api/regulations.py` | NEU: API Endpoints |
| compliance | `admin-compliance/app/sdk/regulation-registry/` | NEU: Frontend-Seite |
Statt nur nach Embedding-Aehnlichkeit zu clustern, nach **Regulation-Quelle** aufteilen:
### DB-Schema
```
MC "encryption" (1.790 Controls)
→ encryption_dsgvo (DSGVO Art. 32, ~200)
→ encryption_nis2 (NIS2 Art. 21, ~150)
→ encryption_nist (NIST SC-13, ~300)
→ encryption_bsi (BSI, ~200)
→ encryption_owasp (OWASP, ~100)
→ encryption_other (~840)
```
```sql
CREATE TABLE compliance.regulation_registry (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
regulation_id VARCHAR(100) UNIQUE NOT NULL,
regulation_name_de TEXT,
regulation_name_en TEXT,
regulation_short VARCHAR(50),
license_rule INTEGER NOT NULL DEFAULT 1 CHECK (license_rule IN (1, 2, 3)),
license_type VARCHAR(50),
source_type VARCHAR(20) NOT NULL DEFAULT 'law',
jurisdiction VARCHAR(10),
category VARCHAR(50),
celex VARCHAR(20),
url TEXT,
status VARCHAR(20) NOT NULL DEFAULT 'needs_review',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
### Script-Ansatz
CREATE INDEX idx_reg_registry_status ON compliance.regulation_registry(status);
CREATE INDEX idx_reg_registry_jurisdiction ON compliance.regulation_registry(jurisdiction);
```python
# Fuer jeden der 25 grossen MCs:
# 1. Hole alle member controls mit source_citation->>'source'
# 2. Gruppiere nach source (Regulation)
# 3. Erstelle Sub-MCs pro Regulation
# 4. Controls ohne source → "general" Sub-MC
```
### Qualitaetsanforderung (WICHTIG!)
**Nur "sehr gut" ist akzeptabel.** Mittlere MCs (30-100 Controls) sind bereits excellent:
- MC-1082 (data_retention_policies, 52) → perfekt koharent
- MC-5477 (austausch_von_cybersicherheitsinformationen, 5) → perfekt
Ziel: ALLE MCs sollen diese Qualitaet haben. Kein MC >100 Controls.
---
## SESSION 03-06.05.2026 KOMPLETT ERLEDIGT
### Block F (Hardcoded Knowledge → DB)
- F1: regulation_registry (223 Eintraege) ✅
- F2: action_types (34) + action_synonyms (368) ✅
- F3: object_synonyms (320) ✅
- F4: LLM Enrichment (+468 Synonyme via Ollama) ✅
- F5: Validation (8 Tests, Dicts als Fallback) ✅
### Control Generation Pipeline
- 1.599 Rich Controls aus E-Block Chunks (~$17 Anthropic)
- 11.522 Obligations (Pass 0a, ~$4)
- 1.147 Atomic Controls (Pass 0b, ~$4.60)
- **Gesamtkosten: ~$25.60**
### Production Sync
- 2.625 Controls + 11.522 Obligations auf Production synchronisiert
- Production: 294.027 Controls total
- Backups: lokal + production auf MacBook
### Block G-pre (Master Controls)
- G-pre1: 144k Objects → 7.753 Gruppen (K-Means k=5000 + Sub-Cluster + Refinement)
- G-pre2: 5.329 Master Controls, 172.504+ Members
- G-pre3: Master Control API (list, stats, detail)
- **Qualitaet:** Kleine/mittlere MCs excellent, 25 grosse MCs brauchen Regulation-Source Split
### Block G (Compliance Execution Layer)
- G1: Decision Trace (decision_traces Tabelle + 6 API Endpoints) ✅
- G2: Compliance Commit Ledger (compliance_commits + 5 Endpoints) ✅
- G3: Full Decision Memory (decision_events + Timeline + 4 Endpoints) ✅
- G4: Pre-Deployment Enforcement (deployment_checks + Override + 4 Endpoints) ✅
### Infrastruktur
- Vault CPU-Fix committed (Marker-File + idempotente Checks)
- Pass 0a Endpoint im Core Control-Pipeline registriert
- Gitea Timezone-Fix (docker-compose.yml)
- 61 neue regulation_ids in regulation_registry
- Container-Cleanup (fewo-finance-agent, mediaanalysisd)
---
## DB-Tabellen (alle Bloecke)
| Tabelle | Rows | Migration |
|---------|------|-----------|
| compliance.regulation_registry | 223 | 002 |
| compliance.action_types | 34 | 003 |
| compliance.action_synonyms | 368 | 003 |
| compliance.object_synonyms | 320 | 003 |
| compliance.object_groups | 7.753 | 004 |
| compliance.master_controls | 5.329 | 005 |
| compliance.master_control_members | ~170k | 005 |
| compliance.decision_traces | 0 (Schema ready) | 006 |
| compliance.compliance_commits | 0 (Schema ready) | 007 |
| compliance.decision_events | 0 (Schema ready) | 008 |
| compliance.deployment_checks | 0 (Schema ready) | 009 |
---
## API Endpoints (Core Control-Pipeline, Port 8098)
### Bestehend
- `/v1/canonical/generate/*` — Control Generation Pipeline
- `/v1/canonical/generate/run-pass0a` — Pass 0a (NEU in dieser Session)
- `/v1/canonical/generate/submit-pass0b` — Pass 0b Batch API
### Neu (diese Session)
- `/v1/master-controls` — G-pre3: Liste, Stats, Detail
- `/v1/decision-traces` — G1: CRUD + Stats
- `/v1/controls/{id}/full-trace` — G1: Volle Kette
- `/v1/compliance-commits` — G2: Commit Ledger
- `/v1/decision-events` — G3: Lifecycle Events + Timeline
- `/v1/deployment-checks` — G4: Pre-Deploy Gate + Override
### API-Zugriff (WICHTIG)
```bash
# Nur via Docker exec (Port 8098 blockiert durch document-crawler)
ssh macmini "/usr/local/bin/docker exec bp-core-control-pipeline curl -sf http://127.0.0.1:8098/..."
```
---
## GESAMTPLAN Block F (4 Tage)
## BACKUPS (auf MacBook)
| Phase | Was | Aufwand | Status |
|-------|-----|---------|--------|
| F1 | Regulation Registry (DB + API + Frontend + Auto-Create) | 1 Tag | 🔥 NAECHSTER |
| F2 | Action Types + Synonyme → DB | 1 Tag | Ausstehend |
| F3 | Object Synonyms → DB | 0.5 Tag | Ausstehend |
| F4 | LLM Synonym-Enrichment | 1 Tag | Ausstehend |
| F5 | Validation + Cleanup | 0.5 Tag | Ausstehend |
| Datei | Inhalt | Groesse |
|-------|--------|---------|
| controls_backup_20260505.csv | 1.599 neue Controls | 7.2 MB |
| obligations_backup_20260505.csv | 11.522 Obligations | 6.2 MB |
| production_backup_20260505.dump | Production komprimiert | 30 MB |
| production_backup_20260505_plain.sql | Production plain | 1.3 GB |
| local_backup_20260506.dump | Lokale DB komprimiert | ~30 MB |
| production_backup_20260506.dump | Production komprimiert | ~30 MB |
---
## SESSION 02-03.05.2026 ERLEDIGT
## GESTOPPTE CONTAINER
- Block D5+: NIST/ENISA PDF-Qualitaet (0%→45%)
- Block D6: Citation-Backfill (3.651 Controls)
- Block E2: 8 DE-Gesetze (1.629 Chunks)
- Block E3: 5 EU-Regulierungen (1.057 Chunks)
- Block E4: GoBD, BAIT, VAIT (144 Chunks)
- Block E6: 3 CH + 4 AT Gesetze (3.881 Chunks)
- Block E7: 9 Urteile als Volltext (709 Chunks total)
- Schrems II: 154, BVerfG Datenanalyse: 161, DSK OH Telemedien: 119
- Meta: 101, BAG Zeiterfassung: 48, Planet49: 42, SCHUFA: 41
- Schadenersatz: 29, Google Fonts: 14
- Infra: Qdrant-Snapshot, Upload-before-Delete, 99 Tests
```bash
# Vault: Erst nach Fix-Deploy starten (Marker-File noetig)
ssh macmini "/usr/local/bin/docker start bp-core-vault"
**Gesamt neue Chunks diese Session: ~25.000+**
# OpenSearch: Bei Bedarf
ssh macmini "/usr/local/bin/docker start bp-lehrer-opensearch"
# fewo-finance-agent: Fremder Container, nicht starten
```
---
## TESTS
```bash
# Embedding-Service (99 Tests)
cd embedding-service && python3 -m pytest test_chunking.py test_d4_bgb.py test_nist_normalization.py -v
# Control-Pipeline (387 Tests)
# Pipeline (454 Tests)
PYTHONPATH=control-pipeline python3 -m pytest control-pipeline/tests/ -v
# Qdrant-Snapshot
ssh macmini "cd ~/Projekte/breakpilot-core && bash scripts/qdrant-snapshot.sh"
```
---
## PLAN-DATEI
## OFFENE PUNKTE FUER ANDERE SESSIONS
Block F Detailplan: `/Users/benjaminadmin/.claude/plans/humming-nibbling-sonnet.md`
1. **Qdrant API-Key** fuer Production (qdrant-dev.breakpilot.ai) ist ungueltig (401). Muss in Coolify erneuert werden.
2. **DSI-Check False Positives**: Controls mischen interne Governance mit externen DSI-Anforderungen. Fix: nur Controls mit Art. 13/14 Referenz fuer DSI-Checks nutzen.
3. **Spotlight + mediaanalysisd** auf Mac Mini deaktivieren (braucht sudo):
```bash
sudo mdutil -a -i off
sudo launchctl disable system/com.apple.mediaanalysisd
```
4. **Production DB Sync** fuer neue G-Block Tabellen (decision_traces, compliance_commits, decision_events, deployment_checks) noch ausstehend — Tabellen sind leer, Schema muss auf Production deployed werden.
+12
View File
@@ -4,9 +4,21 @@ from api.control_generator_routes import router as generator_router
from api.canonical_control_routes import router as canonical_router
from api.document_compliance_routes import router as document_router
from api.dependency_routes import router as dependency_router
from api.master_control_routes import router as master_control_router
from api.decision_trace_routes import router as decision_trace_router
from api.decision_trace_routes import full_trace_router
from api.compliance_commit_routes import router as compliance_commit_router
from api.decision_event_routes import router as decision_event_router
from api.deployment_check_routes import router as deployment_check_router
router = APIRouter()
router.include_router(generator_router)
router.include_router(canonical_router)
router.include_router(document_router)
router.include_router(dependency_router)
router.include_router(master_control_router)
router.include_router(decision_trace_router)
router.include_router(full_trace_router)
router.include_router(compliance_commit_router)
router.include_router(decision_event_router)
router.include_router(deployment_check_router)
@@ -0,0 +1,255 @@
"""Compliance Commit Ledger API — G2.
Tracks code commits and their compliance impact. SDK reports each commit
with affected controls, building an audit trail for code↔compliance mapping.
"""
import json
import logging
import uuid
from typing import Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import text
from db.session import SessionLocal
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/v1/compliance-commits", tags=["compliance-commits"])
class CreateCommitRequest(BaseModel):
tenant_id: str
project_id: Optional[str] = None
commit_hash: str
commit_message: Optional[str] = None
commit_author: Optional[str] = None
commit_date: Optional[str] = None
branch: Optional[str] = None
repo_url: Optional[str] = None
affected_control_ids: list[str] = []
affected_files: list[str] = []
risk_level: str = "low"
analysis_summary: Optional[str] = None
analysis_metadata: dict = {}
@router.post("")
async def register_commit(req: CreateCommitRequest):
"""Register a code commit with its compliance impact."""
db = SessionLocal()
try:
cid = str(uuid.uuid4())
db.execute(text("""
INSERT INTO compliance_commits
(id, tenant_id, project_id, commit_hash, commit_message,
commit_author, commit_date, branch, repo_url,
affected_control_ids, affected_files,
risk_level, analysis_summary, analysis_metadata)
VALUES
(CAST(:id AS uuid), CAST(:tenant_id AS uuid), :project_id,
:commit_hash, :commit_message, :commit_author,
:commit_date, :branch, :repo_url,
CAST(:control_ids AS jsonb), CAST(:files AS jsonb),
:risk_level, :analysis_summary, CAST(:metadata AS jsonb))
"""), {
"id": cid,
"tenant_id": req.tenant_id,
"project_id": req.project_id,
"commit_hash": req.commit_hash,
"commit_message": req.commit_message,
"commit_author": req.commit_author,
"commit_date": req.commit_date,
"branch": req.branch,
"repo_url": req.repo_url,
"control_ids": json.dumps(req.affected_control_ids),
"files": json.dumps(req.affected_files),
"risk_level": req.risk_level,
"analysis_summary": req.analysis_summary,
"metadata": json.dumps(req.analysis_metadata),
})
db.commit()
return {
"id": cid,
"status": "registered",
"affected_controls": len(req.affected_control_ids),
"risk_level": req.risk_level,
}
finally:
db.close()
@router.get("")
async def list_commits(
tenant_id: Optional[str] = None,
control_id: Optional[str] = None,
risk_level: Optional[str] = None,
branch: Optional[str] = None,
since: Optional[str] = None,
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
):
"""List compliance commits with filters."""
db = SessionLocal()
try:
clauses = []
params: dict = {"limit": limit, "offset": offset}
if tenant_id:
clauses.append("tenant_id = CAST(:tenant_id AS uuid)")
params["tenant_id"] = tenant_id
if control_id:
clauses.append("affected_control_ids @> CAST(:cid_json AS jsonb)")
params["cid_json"] = json.dumps([control_id])
if risk_level:
clauses.append("risk_level = :risk")
params["risk"] = risk_level
if branch:
clauses.append("branch = :branch")
params["branch"] = branch
if since:
clauses.append("commit_date >= CAST(:since AS timestamptz)")
params["since"] = since
where = "WHERE " + " AND ".join(clauses) if clauses else ""
rows = db.execute(text(f"""
SELECT id, commit_hash, commit_message, commit_author, commit_date,
branch, affected_control_ids, affected_files, risk_level
FROM compliance_commits
{where}
ORDER BY commit_date DESC NULLS LAST
LIMIT :limit OFFSET :offset
"""), params).fetchall()
total = db.execute(text(f"""
SELECT count(*) FROM compliance_commits {where}
"""), params).scalar()
return {
"total": total,
"commits": [
{
"id": str(r[0]),
"commit_hash": r[1],
"message": r[2],
"author": r[3],
"date": str(r[4]) if r[4] else None,
"branch": r[5],
"affected_control_ids": r[6],
"affected_files": r[7],
"risk_level": r[8],
}
for r in rows
],
}
finally:
db.close()
@router.get("/stats")
async def commit_stats(tenant_id: Optional[str] = None):
"""Dashboard stats for compliance commits."""
db = SessionLocal()
try:
tf = ""
params: dict = {}
if tenant_id:
tf = "WHERE tenant_id = CAST(:tid AS uuid)"
params["tid"] = tenant_id
risk = db.execute(text(f"""
SELECT risk_level, count(*) FROM compliance_commits {tf}
GROUP BY risk_level
"""), params).fetchall()
recent = db.execute(text(f"""
SELECT count(*) FROM compliance_commits
{tf + ' AND' if tf else 'WHERE'} commit_date > NOW() - interval '7 days'
"""), params).scalar()
total = sum(r[1] for r in risk)
return {
"total_commits": total,
"last_7_days": recent,
"by_risk_level": {r[0]: r[1] for r in risk},
}
finally:
db.close()
@router.get("/by-control/{control_id}")
async def commits_by_control(
control_id: str,
limit: int = Query(50, ge=1, le=200),
):
"""Get all commits that affect a specific control."""
db = SessionLocal()
try:
rows = db.execute(text("""
SELECT id, commit_hash, commit_message, commit_author, commit_date,
branch, repo_url, affected_files, risk_level
FROM compliance_commits
WHERE affected_control_ids @> CAST(:cid_json AS jsonb)
ORDER BY commit_date DESC NULLS LAST
LIMIT :limit
"""), {
"cid_json": json.dumps([control_id]),
"limit": limit,
}).fetchall()
return {
"control_id": control_id,
"total_commits": len(rows),
"commits": [
{
"id": str(r[0]),
"commit_hash": r[1],
"message": r[2],
"author": r[3],
"date": str(r[4]) if r[4] else None,
"branch": r[5],
"repo_url": r[6],
"affected_files": r[7],
"risk_level": r[8],
}
for r in rows
],
}
finally:
db.close()
@router.get("/{commit_id}")
async def get_commit(commit_id: str):
"""Get details of a single compliance commit."""
db = SessionLocal()
try:
row = db.execute(text("""
SELECT * FROM compliance_commits WHERE id = CAST(:id AS uuid)
"""), {"id": commit_id}).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Commit not found")
return {
"id": str(row.id),
"tenant_id": str(row.tenant_id),
"project_id": str(row.project_id) if row.project_id else None,
"commit_hash": row.commit_hash,
"commit_message": row.commit_message,
"commit_author": row.commit_author,
"commit_date": str(row.commit_date) if row.commit_date else None,
"branch": row.branch,
"repo_url": row.repo_url,
"affected_control_ids": row.affected_control_ids,
"affected_files": row.affected_files,
"risk_level": row.risk_level,
"analysis_summary": row.analysis_summary,
"analysis_metadata": row.analysis_metadata,
}
finally:
db.close()
@@ -2293,6 +2293,67 @@ async def get_batch_process_status(job_id: str):
return status
class RunPass0aRequest(BaseModel):
limit: int = 0 # 0 = no limit
batch_size: int = 5
use_anthropic: bool = True
category_filter: Optional[str] = None
source_filter: Optional[str] = None
_pass0a_status: dict = {}
async def _run_pass0a_background(req: RunPass0aRequest, job_id: str):
"""Run Pass 0a in background with own DB session."""
from services.decomposition_pass import DecompositionPass
db = SessionLocal()
try:
_pass0a_status[job_id] = {"status": "running"}
dp = DecompositionPass(db)
result = await dp.run_pass0a(
limit=req.limit,
batch_size=req.batch_size,
use_anthropic=req.use_anthropic,
category_filter=req.category_filter,
source_filter=req.source_filter,
)
_pass0a_status[job_id] = {"status": "completed", **result}
logger.info("Pass 0a job %s completed: %s", job_id, result)
except Exception as e:
logger.error("Pass 0a job %s failed: %s", job_id, e)
_pass0a_status[job_id] = {"status": "failed", "error": str(e)}
finally:
db.close()
@router.post("/generate/run-pass0a")
async def run_pass0a(req: RunPass0aRequest):
"""Run Pass 0a (Obligation Extraction) on undecomposed controls.
Extracts individual normative obligations from rich controls using LLM.
Runs in background — poll status via GET /generate/pass0a-status/{job_id}.
"""
import uuid
job_id = str(uuid.uuid4())[:8]
_pass0a_status[job_id] = {"status": "starting"}
asyncio.create_task(_run_pass0a_background(req, job_id))
return {
"status": "running",
"job_id": job_id,
"message": f"Pass 0a started. Poll /generate/pass0a-status/{job_id}",
}
@router.get("/generate/pass0a-status/{job_id}")
async def get_pass0a_status(job_id: str):
"""Get status of a Pass 0a job."""
status = _pass0a_status.get(job_id)
if not status:
raise HTTPException(status_code=404, detail="Pass 0a job not found")
return status
class SubmitPass0bRequest(BaseModel):
limit: int = 10
batch_size: int = 5
@@ -0,0 +1,224 @@
"""Decision Events API — G3 Full Decision Memory.
Event-stream for each control's compliance lifecycle:
assessment decision fix verification (failure new cycle)
"""
import json
import logging
import uuid
from typing import Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import text
from db.session import SessionLocal
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/v1/decision-events", tags=["decision-events"])
class CreateEventRequest(BaseModel):
control_uuid: str
decision_trace_id: Optional[str] = None
tenant_id: Optional[str] = None
event_type: str
input_state: dict = {}
output_state: dict = {}
summary: Optional[str] = None
actor: Optional[str] = None
evidence_ids: list[str] = []
metadata: dict = {}
@router.post("")
async def create_event(req: CreateEventRequest):
"""Record a decision event in the compliance lifecycle."""
db = SessionLocal()
try:
eid = str(uuid.uuid4())
db.execute(text("""
INSERT INTO decision_events
(id, decision_trace_id, control_uuid, tenant_id,
event_type, input_state, output_state,
summary, actor, evidence_ids, metadata)
VALUES
(CAST(:id AS uuid),
CASE WHEN :trace_id IS NOT NULL THEN CAST(:trace_id AS uuid) ELSE NULL END,
CAST(:control_uuid AS uuid),
CASE WHEN :tenant_id IS NOT NULL THEN CAST(:tenant_id AS uuid) ELSE NULL END,
:event_type, CAST(:input AS jsonb), CAST(:output AS jsonb),
:summary, :actor, CAST(:evidence AS jsonb), CAST(:meta AS jsonb))
"""), {
"id": eid,
"trace_id": req.decision_trace_id,
"control_uuid": req.control_uuid,
"tenant_id": req.tenant_id,
"event_type": req.event_type,
"input": json.dumps(req.input_state),
"output": json.dumps(req.output_state),
"summary": req.summary,
"actor": req.actor,
"evidence": json.dumps(req.evidence_ids),
"meta": json.dumps(req.metadata),
})
db.commit()
return {"id": eid, "event_type": req.event_type, "status": "recorded"}
finally:
db.close()
@router.get("")
async def list_events(
control_uuid: Optional[str] = None,
tenant_id: Optional[str] = None,
event_type: Optional[str] = None,
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
):
"""List decision events with filters."""
db = SessionLocal()
try:
clauses = []
params: dict = {"limit": limit, "offset": offset}
if control_uuid:
clauses.append("de.control_uuid = CAST(:cuuid AS uuid)")
params["cuuid"] = control_uuid
if tenant_id:
clauses.append("de.tenant_id = CAST(:tid AS uuid)")
params["tid"] = tenant_id
if event_type:
clauses.append("de.event_type = :etype")
params["etype"] = event_type
where = "WHERE " + " AND ".join(clauses) if clauses else ""
rows = db.execute(text(f"""
SELECT de.id, de.control_uuid, cc.control_id,
de.event_type, de.summary, de.actor,
de.input_state, de.output_state,
de.evidence_ids, de.created_at
FROM decision_events de
LEFT JOIN canonical_controls cc ON cc.id = de.control_uuid
{where}
ORDER BY de.created_at DESC
LIMIT :limit OFFSET :offset
"""), params).fetchall()
return {
"total": len(rows),
"events": [
{
"id": str(r[0]),
"control_uuid": str(r[1]),
"control_id": r[2],
"event_type": r[3],
"summary": r[4],
"actor": r[5],
"input_state": r[6],
"output_state": r[7],
"evidence_ids": r[8],
"created_at": str(r[9]),
}
for r in rows
],
}
finally:
db.close()
@router.get("/stats")
async def event_stats(tenant_id: Optional[str] = None):
"""Lifecycle statistics: cycle times, failure rates."""
db = SessionLocal()
try:
tf = ""
params: dict = {}
if tenant_id:
tf = "WHERE tenant_id = CAST(:tid AS uuid)"
params["tid"] = tenant_id
by_type = db.execute(text(f"""
SELECT event_type, count(*) FROM decision_events {tf}
GROUP BY event_type ORDER BY count(*) DESC
"""), params).fetchall()
total = sum(r[1] for r in by_type)
failures = next((r[1] for r in by_type if r[0] == "failure"), 0)
verifications = next((r[1] for r in by_type if r[0] == "verification"), 0)
return {
"total_events": total,
"by_event_type": {r[0]: r[1] for r in by_type},
"failure_rate": round(failures / total * 100, 1) if total > 0 else 0,
"verification_rate": round(verifications / total * 100, 1) if total > 0 else 0,
}
finally:
db.close()
@router.get("/timeline/{control_id}")
async def get_timeline(control_id: str):
"""Full chronological timeline for a control's compliance lifecycle."""
db = SessionLocal()
try:
# Resolve control_id to UUID
ctrl = db.execute(text("""
SELECT id, control_id, title FROM canonical_controls
WHERE control_id = :cid
"""), {"cid": control_id}).fetchone()
if not ctrl:
raise HTTPException(status_code=404, detail="Control not found")
events = db.execute(text("""
SELECT id, event_type, summary, actor,
input_state, output_state, evidence_ids, created_at
FROM decision_events
WHERE control_uuid = CAST(:uuid AS uuid)
ORDER BY created_at ASC
"""), {"uuid": str(ctrl[0])}).fetchall()
# Determine current state from latest event
current_state = "unknown"
if events:
last = events[-1]
output = last[5] or {}
current_state = output.get("status", last[1])
# Calculate avg fix time (assessment → fix_completed)
fix_times = []
assessment_at = None
for e in events:
if e[1] == "assessment":
assessment_at = e[7]
elif e[1] == "fix_completed" and assessment_at:
delta = (e[7] - assessment_at).total_seconds() / 3600
fix_times.append(delta)
assessment_at = None
return {
"control_id": ctrl[1],
"control_title": ctrl[2],
"current_state": current_state,
"total_events": len(events),
"time_to_fix_avg_hours": round(sum(fix_times) / len(fix_times), 1) if fix_times else None,
"events": [
{
"id": str(e[0]),
"type": e[1],
"summary": e[2],
"actor": e[3],
"input_state": e[4],
"output_state": e[5],
"evidence_count": len(e[6]) if e[6] else 0,
"at": str(e[7]),
}
for e in events
],
}
finally:
db.close()
@@ -0,0 +1,404 @@
"""Decision Trace API — G1 Compliance Execution Layer.
Tracks compliance decisions per control: who decided, when, why,
what evidence supports it, and what's the remediation plan.
"""
import json
import logging
import uuid
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import text
from db.session import SessionLocal
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/v1/decision-traces", tags=["decision-traces"])
# ── Request/Response Models ──────────────────────────────────────────
class CreateDecisionRequest(BaseModel):
control_uuid: str
regulation_id: Optional[str] = None
obligation_id: Optional[str] = None
status: str = "not_assessed"
decision_reason: Optional[str] = None
decided_by: Optional[str] = None
fix_strategy: Optional[str] = None
fix_owner: Optional[str] = None
fix_target_date: Optional[str] = None
evidence_ids: list[str] = []
confidence: float = 0.0
tenant_id: Optional[str] = None
project_id: Optional[str] = None
metadata: dict = {}
class UpdateDecisionRequest(BaseModel):
status: Optional[str] = None
decision_reason: Optional[str] = None
decided_by: Optional[str] = None
fix_strategy: Optional[str] = None
fix_owner: Optional[str] = None
fix_target_date: Optional[str] = None
fix_completed_date: Optional[str] = None
evidence_ids: Optional[list[str]] = None
confidence: Optional[float] = None
metadata: Optional[dict] = None
# ── Endpoints ────────────────────────────────────────────────────────
@router.post("")
async def create_decision(req: CreateDecisionRequest):
"""Record a new compliance decision for a control."""
db = SessionLocal()
try:
trace_id = str(uuid.uuid4())
db.execute(text("""
INSERT INTO decision_traces
(id, control_uuid, regulation_id, obligation_id,
status, decision_reason, decided_by, decided_at,
fix_strategy, fix_owner, fix_target_date,
evidence_ids, confidence, tenant_id, project_id, metadata)
VALUES
(CAST(:id AS uuid), CAST(:control_uuid AS uuid), :regulation_id, :obligation_id,
:status, :decision_reason, :decided_by, NOW(),
:fix_strategy, :fix_owner, :fix_target_date,
CAST(:evidence_ids AS jsonb), :confidence,
:tenant_id, :project_id, CAST(:metadata AS jsonb))
"""), {
"id": trace_id,
"control_uuid": req.control_uuid,
"regulation_id": req.regulation_id,
"obligation_id": req.obligation_id,
"status": req.status,
"decision_reason": req.decision_reason,
"decided_by": req.decided_by,
"fix_strategy": req.fix_strategy,
"fix_owner": req.fix_owner,
"fix_target_date": req.fix_target_date,
"evidence_ids": json.dumps(req.evidence_ids),
"confidence": req.confidence,
"tenant_id": req.tenant_id,
"project_id": req.project_id,
"metadata": json.dumps(req.metadata),
})
db.commit()
return {"id": trace_id, "status": "created"}
finally:
db.close()
@router.get("")
async def list_decisions(
control_uuid: Optional[str] = None,
status: Optional[str] = None,
tenant_id: Optional[str] = None,
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
):
"""List decision traces with optional filters."""
db = SessionLocal()
try:
clauses = []
params: dict = {"limit": limit, "offset": offset}
if control_uuid:
clauses.append("dt.control_uuid = CAST(:control_uuid AS uuid)")
params["control_uuid"] = control_uuid
if status:
clauses.append("dt.status = :status")
params["status"] = status
if tenant_id:
clauses.append("dt.tenant_id = CAST(:tenant_id AS uuid)")
params["tenant_id"] = tenant_id
where = "WHERE " + " AND ".join(clauses) if clauses else ""
rows = db.execute(text(f"""
SELECT dt.id, dt.control_uuid, cc.control_id, cc.title,
dt.status, dt.decision_reason, dt.decided_by, dt.decided_at,
dt.fix_strategy, dt.fix_owner, dt.fix_target_date, dt.fix_completed_date,
dt.evidence_ids, dt.confidence, dt.regulation_id
FROM decision_traces dt
LEFT JOIN canonical_controls cc ON cc.id = dt.control_uuid
{where}
ORDER BY dt.decided_at DESC NULLS LAST
LIMIT :limit OFFSET :offset
"""), params).fetchall()
total = db.execute(text(f"""
SELECT count(*) FROM decision_traces dt {where}
"""), params).scalar()
return {
"total": total,
"decisions": [
{
"id": str(r[0]),
"control_uuid": str(r[1]),
"control_id": r[2],
"control_title": r[3],
"status": r[4],
"decision_reason": r[5],
"decided_by": r[6],
"decided_at": str(r[7]) if r[7] else None,
"fix_strategy": r[8],
"fix_owner": r[9],
"fix_target_date": str(r[10]) if r[10] else None,
"fix_completed_date": str(r[11]) if r[11] else None,
"evidence_ids": r[12],
"confidence": float(r[13]) if r[13] else 0,
"regulation_id": r[14],
}
for r in rows
],
}
finally:
db.close()
@router.get("/stats")
async def decision_stats(tenant_id: Optional[str] = None):
"""Dashboard statistics for compliance decisions."""
db = SessionLocal()
try:
tenant_filter = ""
params: dict = {}
if tenant_id:
tenant_filter = "WHERE tenant_id = CAST(:tenant_id AS uuid)"
params["tenant_id"] = tenant_id
stats = db.execute(text(f"""
SELECT status, count(*) FROM decision_traces
{tenant_filter}
GROUP BY status
"""), params).fetchall()
total = sum(r[1] for r in stats)
by_status = {r[0]: r[1] for r in stats}
return {
"total_decisions": total,
"by_status": by_status,
"compliance_rate": round(
by_status.get("compliant", 0) / total * 100, 1
) if total > 0 else 0,
"pending_remediation": by_status.get("under_remediation", 0),
"not_assessed": by_status.get("not_assessed", 0),
}
finally:
db.close()
@router.get("/{trace_id}")
async def get_decision(trace_id: str):
"""Get a single decision trace."""
db = SessionLocal()
try:
row = db.execute(text("""
SELECT dt.*, cc.control_id, cc.title, cc.source_citation
FROM decision_traces dt
LEFT JOIN canonical_controls cc ON cc.id = dt.control_uuid
WHERE dt.id = CAST(:id AS uuid)
"""), {"id": trace_id}).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Decision trace not found")
return {
"id": str(row.id),
"control_uuid": str(row.control_uuid),
"control_id": row.control_id,
"control_title": row.title,
"regulation_id": row.regulation_id,
"obligation_id": row.obligation_id,
"status": row.status,
"decision_reason": row.decision_reason,
"decided_by": row.decided_by,
"decided_at": str(row.decided_at) if row.decided_at else None,
"fix_strategy": row.fix_strategy,
"fix_owner": row.fix_owner,
"fix_target_date": str(row.fix_target_date) if row.fix_target_date else None,
"fix_completed_date": str(row.fix_completed_date) if row.fix_completed_date else None,
"evidence_ids": row.evidence_ids,
"confidence": float(row.confidence) if row.confidence else 0,
"source_citation": row.source_citation,
"metadata": row.metadata,
}
finally:
db.close()
@router.put("/{trace_id}")
async def update_decision(trace_id: str, req: UpdateDecisionRequest):
"""Update a decision trace (status, fix progress, evidence)."""
db = SessionLocal()
try:
updates = []
params: dict = {"id": trace_id}
if req.status is not None:
updates.append("status = :status")
params["status"] = req.status
if req.decision_reason is not None:
updates.append("decision_reason = :reason")
params["reason"] = req.decision_reason
if req.decided_by is not None:
updates.append("decided_by = :decided_by")
params["decided_by"] = req.decided_by
if req.fix_strategy is not None:
updates.append("fix_strategy = :fix_strategy")
params["fix_strategy"] = req.fix_strategy
if req.fix_owner is not None:
updates.append("fix_owner = :fix_owner")
params["fix_owner"] = req.fix_owner
if req.fix_target_date is not None:
updates.append("fix_target_date = :fix_target")
params["fix_target"] = req.fix_target_date
if req.fix_completed_date is not None:
updates.append("fix_completed_date = :fix_completed")
params["fix_completed"] = req.fix_completed_date
if req.evidence_ids is not None:
updates.append("evidence_ids = CAST(:evidence AS jsonb)")
params["evidence"] = json.dumps(req.evidence_ids)
if req.confidence is not None:
updates.append("confidence = :confidence")
params["confidence"] = req.confidence
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
result = db.execute(text(f"""
UPDATE decision_traces SET {', '.join(updates)}
WHERE id = CAST(:id AS uuid)
"""), params)
db.commit()
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="Decision trace not found")
return {"status": "updated", "id": trace_id}
finally:
db.close()
# ── Full Trace Endpoint ──────────────────────────────────────────────
full_trace_router = APIRouter(prefix="/v1/controls", tags=["decision-traces"])
@full_trace_router.get("/{control_id}/full-trace")
async def get_full_trace(control_id: str):
"""Get the complete Decision Trace chain for a control.
Returns: Regulation Obligation Control Master Control Decision Evidence
"""
db = SessionLocal()
try:
# 1. Control
ctrl = db.execute(text("""
SELECT id, control_id, title, objective, severity,
source_citation, source_original_text,
verification_method, category,
generation_metadata->>'merge_group_hint' AS merge_hint
FROM canonical_controls
WHERE control_id = :cid
"""), {"cid": control_id}).fetchone()
if not ctrl:
raise HTTPException(status_code=404, detail="Control not found")
# 2. Regulation (from source_citation)
citation = ctrl.source_citation or {}
regulation = {
"source": citation.get("source"),
"article": citation.get("article"),
"paragraph": citation.get("paragraph"),
"source_type": citation.get("source_type"),
"license": citation.get("license"),
}
# 3. Obligation (from parent links)
obligations = db.execute(text("""
SELECT oc.candidate_id, oc.obligation_text, oc.action,
oc.object, oc.normative_strength
FROM obligation_candidates oc
WHERE oc.parent_control_uuid = CAST(:uuid AS uuid)
ORDER BY oc.candidate_id
LIMIT 10
"""), {"uuid": str(ctrl.id)}).fetchall()
# 4. Master Control (if member)
master = db.execute(text("""
SELECT mc.master_control_id, mc.canonical_name, mc.phases_covered
FROM master_control_members mcm
JOIN master_controls mc ON mc.id = mcm.master_control_uuid
WHERE mcm.control_uuid = CAST(:uuid AS uuid)
LIMIT 1
"""), {"uuid": str(ctrl.id)}).fetchone()
# 5. Decision Traces
decisions = db.execute(text("""
SELECT id, status, decision_reason, decided_by, decided_at,
fix_strategy, fix_owner, evidence_ids, confidence
FROM decision_traces
WHERE control_uuid = CAST(:uuid AS uuid)
ORDER BY decided_at DESC NULLS LAST
"""), {"uuid": str(ctrl.id)}).fetchall()
return {
"control": {
"id": ctrl.control_id,
"uuid": str(ctrl.id),
"title": ctrl.title,
"objective": ctrl.objective,
"severity": ctrl.severity,
"category": ctrl.category,
"verification_method": ctrl.verification_method,
},
"regulation": regulation,
"original_text": ctrl.source_original_text[:500] if ctrl.source_original_text else None,
"obligations": [
{
"id": o.candidate_id,
"text": o.obligation_text,
"action": o.action,
"object": o.object,
"strength": o.normative_strength,
}
for o in obligations
],
"master_control": {
"id": master.master_control_id,
"name": master.canonical_name,
"phases": master.phases_covered,
} if master else None,
"decisions": [
{
"id": str(d.id),
"status": d.status,
"reason": d.decision_reason,
"decided_by": d.decided_by,
"decided_at": str(d.decided_at) if d.decided_at else None,
"fix_strategy": d.fix_strategy,
"fix_owner": d.fix_owner,
"evidence_count": len(d.evidence_ids) if d.evidence_ids else 0,
"confidence": float(d.confidence) if d.confidence else 0,
}
for d in decisions
],
"latest_status": decisions[0].status if decisions else "not_assessed",
}
finally:
db.close()
@@ -0,0 +1,258 @@
"""Pre-Deployment Enforcement API — G4.
CI/CD gate: checks if a deployment is safe by evaluating the compliance
status of all affected controls. Blocks deploys with non-compliant controls.
"""
import json
import logging
import uuid
from typing import Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import text
from db.session import SessionLocal
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/v1/deployment-checks", tags=["deployment-checks"])
SEVERITY_WEIGHT = {
"critical": 4.0,
"high": 3.0,
"medium": 2.0,
"low": 1.0,
}
class DeployCheckRequest(BaseModel):
tenant_id: str
commit_hash: str
branch: Optional[str] = None
environment: str = "production"
affected_control_ids: list[str] = []
metadata: dict = {}
class OverrideRequest(BaseModel):
override_by: str
override_reason: str
@router.post("")
async def check_deployment(req: DeployCheckRequest):
"""Check if a deployment is safe. Returns verdict: approved/blocked."""
db = SessionLocal()
try:
check_id = str(uuid.uuid4())
blocking = []
warnings = []
risk_score = 0.0
if req.affected_control_ids:
# Look up latest decision status for each affected control
for ctrl_id in req.affected_control_ids:
row = db.execute(text("""
SELECT dt.status, dt.decision_reason, dt.fix_strategy,
cc.control_id, cc.title, cc.severity
FROM decision_traces dt
JOIN canonical_controls cc ON cc.id = dt.control_uuid
WHERE cc.control_id = :cid
ORDER BY dt.decided_at DESC NULLS LAST
LIMIT 1
"""), {"cid": ctrl_id}).fetchone()
if not row:
# No decision → treat as not_assessed (warning)
warnings.append({
"control_id": ctrl_id,
"status": "not_assessed",
"reason": "No compliance decision recorded",
})
continue
status = row[0]
severity = row[5] or "medium"
weight = SEVERITY_WEIGHT.get(severity, 2.0)
if status in ("not_compliant", "under_remediation"):
blocking.append({
"control_id": row[3],
"title": row[4],
"status": status,
"reason": row[1],
"fix_strategy": row[2],
"severity": severity,
})
risk_score += weight
elif status == "partially_compliant":
warnings.append({
"control_id": row[3],
"title": row[4],
"status": status,
"reason": row[1],
"severity": severity,
})
risk_score += weight * 0.5
# Also check for open failure events (G3)
if req.affected_control_ids:
placeholders = ",".join(["'%s'" % c for c in req.affected_control_ids])
open_failures = db.execute(text(f"""
SELECT cc.control_id, de.summary
FROM decision_events de
JOIN canonical_controls cc ON cc.id = de.control_uuid
WHERE cc.control_id IN ({placeholders})
AND de.event_type = 'failure'
AND de.created_at > NOW() - interval '30 days'
AND NOT EXISTS (
SELECT 1 FROM decision_events de2
WHERE de2.control_uuid = de.control_uuid
AND de2.event_type = 'verification'
AND de2.created_at > de.created_at
)
""")).fetchall()
for f in open_failures:
if not any(b["control_id"] == f[0] for b in blocking):
blocking.append({
"control_id": f[0],
"status": "open_failure",
"reason": f[1] or "Unresolved failure event",
"severity": "high",
})
risk_score += 3.0
verdict = "approved" if not blocking else "blocked"
summary = (
f"{len(blocking)} blocking, {len(warnings)} warnings. "
+ ("Deploy approved." if verdict == "approved"
else f"Fix {', '.join(b['control_id'] for b in blocking)} before deploying.")
)
# Store check result
db.execute(text("""
INSERT INTO deployment_checks
(id, tenant_id, commit_hash, branch, environment,
verdict, affected_control_ids, blocking_controls,
warning_controls, risk_score, summary, metadata)
VALUES
(CAST(:id AS uuid), CAST(:tid AS uuid), :hash, :branch, :env,
:verdict, CAST(:affected AS jsonb), CAST(:blocking AS jsonb),
CAST(:warnings AS jsonb), :risk, :summary, CAST(:meta AS jsonb))
"""), {
"id": check_id,
"tid": req.tenant_id,
"hash": req.commit_hash,
"branch": req.branch,
"env": req.environment,
"verdict": verdict,
"affected": json.dumps(req.affected_control_ids),
"blocking": json.dumps(blocking),
"warnings": json.dumps(warnings),
"risk": risk_score,
"summary": summary,
"meta": json.dumps(req.metadata),
})
db.commit()
return {
"id": check_id,
"verdict": verdict,
"risk_score": risk_score,
"blocking_controls": blocking,
"warning_controls": warnings,
"summary": summary,
}
finally:
db.close()
@router.get("/stats")
async def check_stats(tenant_id: Optional[str] = None):
"""Deployment check statistics."""
db = SessionLocal()
try:
tf = ""
params: dict = {}
if tenant_id:
tf = "WHERE tenant_id = CAST(:tid AS uuid)"
params["tid"] = tenant_id
by_verdict = db.execute(text(f"""
SELECT verdict, count(*) FROM deployment_checks {tf}
GROUP BY verdict
"""), params).fetchall()
total = sum(r[1] for r in by_verdict)
verdicts = {r[0]: r[1] for r in by_verdict}
return {
"total_checks": total,
"by_verdict": verdicts,
"approval_rate": round(
verdicts.get("approved", 0) / total * 100, 1
) if total > 0 else 0,
"override_count": verdicts.get("override", 0),
}
finally:
db.close()
@router.post("/{check_id}/override")
async def override_check(check_id: str, req: OverrideRequest):
"""Override a blocked deployment (with justification)."""
db = SessionLocal()
try:
result = db.execute(text("""
UPDATE deployment_checks
SET verdict = 'override', override_by = :by, override_reason = :reason
WHERE id = CAST(:id AS uuid) AND verdict = 'blocked'
"""), {
"id": check_id,
"by": req.override_by,
"reason": req.override_reason,
})
db.commit()
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="Check not found or not blocked")
return {"id": check_id, "verdict": "override", "override_by": req.override_by}
finally:
db.close()
@router.get("/{check_id}")
async def get_check(check_id: str):
"""Get details of a deployment check."""
db = SessionLocal()
try:
row = db.execute(text("""
SELECT * FROM deployment_checks WHERE id = CAST(:id AS uuid)
"""), {"id": check_id}).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Check not found")
return {
"id": str(row.id),
"tenant_id": str(row.tenant_id),
"commit_hash": row.commit_hash,
"branch": row.branch,
"environment": row.environment,
"verdict": row.verdict,
"affected_control_ids": row.affected_control_ids,
"blocking_controls": row.blocking_controls,
"warning_controls": row.warning_controls,
"risk_score": float(row.risk_score),
"override_by": row.override_by,
"override_reason": row.override_reason,
"summary": row.summary,
"created_at": str(row.created_at),
}
finally:
db.close()
@@ -0,0 +1,178 @@
"""Master Control API — G-pre3.
Provides read access to Master Controls (lifecycle-grouped atomic controls).
"""
import json
import logging
from typing import Optional
from fastapi import APIRouter, HTTPException, Query
from sqlalchemy import text
from db.session import SessionLocal
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/v1/master-controls", tags=["master-controls"])
@router.get("")
async def list_master_controls(
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
search: Optional[str] = None,
min_phases: Optional[int] = None,
min_controls: Optional[int] = None,
sort: str = Query("total_controls", regex="^(total_controls|phases|name|created_at)$"),
):
"""List Master Controls with optional filtering."""
db = SessionLocal()
try:
where_clauses = []
params: dict = {"limit": limit, "offset": offset}
if search:
where_clauses.append("mc.canonical_name ILIKE :search")
params["search"] = f"%{search}%"
if min_phases:
where_clauses.append("jsonb_array_length(mc.phases_covered) >= :min_phases")
params["min_phases"] = min_phases
if min_controls:
where_clauses.append("mc.total_controls >= :min_controls")
params["min_controls"] = min_controls
where = "WHERE " + " AND ".join(where_clauses) if where_clauses else ""
sort_map = {
"total_controls": "mc.total_controls DESC",
"phases": "jsonb_array_length(mc.phases_covered) DESC",
"name": "mc.canonical_name ASC",
"created_at": "mc.created_at DESC",
}
order = sort_map.get(sort, "mc.total_controls DESC")
rows = db.execute(text(f"""
SELECT mc.id, mc.master_control_id, mc.object_group_id,
mc.canonical_name, mc.phases_covered,
mc.phase_control_count, mc.total_controls,
mc.created_at
FROM master_controls mc
{where}
ORDER BY {order}
LIMIT :limit OFFSET :offset
"""), params).fetchall()
total = db.execute(text(f"""
SELECT count(*) FROM master_controls mc {where}
"""), params).scalar()
return {
"total": total,
"limit": limit,
"offset": offset,
"master_controls": [
{
"id": str(r[0]),
"master_control_id": r[1],
"object_group_id": r[2],
"canonical_name": r[3],
"phases_covered": r[4],
"phase_control_count": r[5],
"total_controls": r[6],
"created_at": str(r[7]),
}
for r in rows
],
}
finally:
db.close()
@router.get("/stats")
async def master_control_stats():
"""Aggregate statistics about Master Controls."""
db = SessionLocal()
try:
stats = db.execute(text("""
SELECT
count(*) AS total_master_controls,
sum(total_controls) AS total_member_controls,
avg(total_controls)::int AS avg_controls_per_mc,
max(total_controls) AS max_controls,
avg(jsonb_array_length(phases_covered))::numeric(3,1) AS avg_phases,
max(jsonb_array_length(phases_covered)) AS max_phases
FROM master_controls
""")).fetchone()
phase_dist = db.execute(text("""
SELECT phase, count(*) AS control_count
FROM master_control_members
GROUP BY phase
ORDER BY control_count DESC
""")).fetchall()
return {
"total_master_controls": stats[0],
"total_member_controls": stats[1],
"avg_controls_per_mc": stats[2],
"max_controls": stats[3],
"avg_phases": float(stats[4]) if stats[4] else 0,
"max_phases": stats[5],
"phase_distribution": {r[0]: r[1] for r in phase_dist},
}
finally:
db.close()
@router.get("/{mc_id}")
async def get_master_control(mc_id: str):
"""Get a single Master Control with all phase-controls."""
db = SessionLocal()
try:
mc = db.execute(text("""
SELECT mc.id, mc.master_control_id, mc.object_group_id,
mc.canonical_name, mc.phases_covered,
mc.phase_control_count, mc.total_controls
FROM master_controls mc
WHERE mc.master_control_id = :mc_id
"""), {"mc_id": mc_id}).fetchone()
if not mc:
raise HTTPException(status_code=404, detail="Master Control not found")
members = db.execute(text("""
SELECT mcm.phase, mcm.action,
cc.control_id, cc.title, cc.severity,
cc.source_citation->>'source' AS source
FROM master_control_members mcm
JOIN canonical_controls cc ON cc.id = mcm.control_uuid
WHERE mcm.master_control_uuid = CAST(:mc_uuid AS uuid)
ORDER BY mcm.phase, cc.control_id
"""), {"mc_uuid": str(mc[0])}).fetchall()
# Group by phase
phases = {}
for phase, action, ctrl_id, title, severity, source in members:
if phase not in phases:
phases[phase] = []
phases[phase].append({
"control_id": ctrl_id,
"title": title,
"action": action,
"severity": severity,
"source": source,
})
return {
"id": str(mc[0]),
"master_control_id": mc[1],
"object_group_id": mc[2],
"canonical_name": mc[3],
"phases_covered": mc[4],
"phase_control_count": mc[5],
"total_controls": mc[6],
"phases": phases,
}
finally:
db.close()
@@ -0,0 +1,18 @@
-- Migration 004: Object Groups (G-pre1)
-- Schema: compliance
-- Run: ssh macmini "docker exec -i bp-core-postgres psql -U breakpilot -d breakpilot_db" < control-pipeline/migrations/004_object_groups.sql
SET search_path TO compliance, public;
CREATE TABLE IF NOT EXISTS object_groups (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
group_id INTEGER NOT NULL,
canonical_name VARCHAR(200) NOT NULL,
member_count INTEGER DEFAULT 0,
members JSONB DEFAULT '[]',
top_controls_count INTEGER DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_object_groups_group_id ON object_groups(group_id);
CREATE INDEX IF NOT EXISTS idx_object_groups_canonical ON object_groups(canonical_name);
@@ -0,0 +1,30 @@
-- Migration 005: Master Controls (G-pre2)
-- Schema: compliance
-- Run: ssh macmini "docker exec -i bp-core-postgres psql -U breakpilot -d breakpilot_db" < control-pipeline/migrations/005_master_controls.sql
SET search_path TO compliance, public;
CREATE TABLE IF NOT EXISTS master_controls (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
master_control_id VARCHAR(50) UNIQUE NOT NULL,
object_group_id INTEGER NOT NULL,
canonical_name VARCHAR(200) NOT NULL,
phases_covered JSONB NOT NULL DEFAULT '[]',
phase_control_count JSONB NOT NULL DEFAULT '{}',
total_controls INTEGER DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_master_controls_group ON master_controls(object_group_id);
CREATE TABLE IF NOT EXISTS master_control_members (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
master_control_uuid UUID NOT NULL REFERENCES master_controls(id) ON DELETE CASCADE,
control_uuid UUID NOT NULL,
phase VARCHAR(50) NOT NULL,
action VARCHAR(50) NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_mc_members_master ON master_control_members(master_control_uuid);
CREATE INDEX IF NOT EXISTS idx_mc_members_control ON master_control_members(control_uuid);
@@ -0,0 +1,58 @@
-- Migration 006: Decision Traces (G1)
-- Schema: compliance
-- Run: ssh macmini "docker exec -i bp-core-postgres psql -U breakpilot -d breakpilot_db" < control-pipeline/migrations/006_decision_traces.sql
SET search_path TO compliance, public;
CREATE TABLE IF NOT EXISTS decision_traces (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
control_uuid UUID NOT NULL,
regulation_id VARCHAR(100),
obligation_id VARCHAR(100),
-- Decision
status VARCHAR(30) NOT NULL DEFAULT 'not_assessed'
CHECK (status IN ('not_assessed', 'compliant', 'partially_compliant',
'not_compliant', 'not_applicable', 'under_remediation')),
decision_reason TEXT,
decided_by VARCHAR(100),
decided_at TIMESTAMPTZ,
-- Fix/Remediation
fix_strategy TEXT,
fix_owner VARCHAR(100),
fix_target_date DATE,
fix_completed_date DATE,
-- Evidence
evidence_ids JSONB DEFAULT '[]',
confidence NUMERIC(3,2) DEFAULT 0.0,
-- Multi-tenant
tenant_id UUID,
project_id UUID,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_dt_control ON decision_traces(control_uuid);
CREATE INDEX IF NOT EXISTS idx_dt_status ON decision_traces(status);
CREATE INDEX IF NOT EXISTS idx_dt_tenant ON decision_traces(tenant_id);
CREATE INDEX IF NOT EXISTS idx_dt_decided_at ON decision_traces(decided_at);
-- Updated-at trigger
CREATE OR REPLACE FUNCTION update_decision_traces_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS trg_decision_traces_updated_at ON decision_traces;
CREATE TRIGGER trg_decision_traces_updated_at
BEFORE UPDATE ON decision_traces
FOR EACH ROW
EXECUTE FUNCTION update_decision_traces_updated_at();
@@ -0,0 +1,38 @@
-- Migration 007: Compliance Commit Ledger (G2)
-- Schema: compliance
-- Run: ssh macmini "docker exec -i bp-core-postgres psql -U breakpilot -d breakpilot_db" < control-pipeline/migrations/007_compliance_commits.sql
SET search_path TO compliance, public;
CREATE TABLE IF NOT EXISTS compliance_commits (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
project_id UUID,
-- Git Info
commit_hash VARCHAR(64) NOT NULL,
commit_message TEXT,
commit_author VARCHAR(200),
commit_date TIMESTAMPTZ,
branch VARCHAR(200),
repo_url TEXT,
-- Affected Controls
affected_control_ids JSONB NOT NULL DEFAULT '[]',
affected_files JSONB DEFAULT '[]',
-- Analysis
risk_level VARCHAR(20) DEFAULT 'low'
CHECK (risk_level IN ('low', 'medium', 'high', 'critical')),
analysis_summary TEXT,
analysis_metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_cc_tenant ON compliance_commits(tenant_id);
CREATE INDEX IF NOT EXISTS idx_cc_hash ON compliance_commits(commit_hash);
CREATE INDEX IF NOT EXISTS idx_cc_date ON compliance_commits(commit_date);
CREATE INDEX IF NOT EXISTS idx_cc_risk ON compliance_commits(risk_level);
-- GIN index for JSONB array containment queries (@>)
CREATE INDEX IF NOT EXISTS idx_cc_control_ids ON compliance_commits USING GIN (affected_control_ids);
@@ -0,0 +1,37 @@
-- Migration 008: Decision Events / Full Decision Memory (G3)
-- Schema: compliance
-- Run: ssh macmini "docker exec -i bp-core-postgres psql -U breakpilot -d breakpilot_db" < control-pipeline/migrations/008_decision_events.sql
SET search_path TO compliance, public;
CREATE TABLE IF NOT EXISTS decision_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
decision_trace_id UUID REFERENCES decision_traces(id) ON DELETE SET NULL,
control_uuid UUID NOT NULL,
tenant_id UUID,
-- Event type
event_type VARCHAR(30) NOT NULL
CHECK (event_type IN (
'assessment', 'decision', 'fix_planned', 'fix_started',
'fix_completed', 'verification', 'failure', 'exception', 'escalation'
)),
-- State before/after
input_state JSONB DEFAULT '{}',
output_state JSONB DEFAULT '{}',
-- Details
summary TEXT,
actor VARCHAR(200),
evidence_ids JSONB DEFAULT '[]',
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_de_control ON decision_events(control_uuid);
CREATE INDEX IF NOT EXISTS idx_de_trace ON decision_events(decision_trace_id);
CREATE INDEX IF NOT EXISTS idx_de_tenant ON decision_events(tenant_id);
CREATE INDEX IF NOT EXISTS idx_de_type ON decision_events(event_type);
CREATE INDEX IF NOT EXISTS idx_de_created ON decision_events(created_at);
@@ -0,0 +1,38 @@
-- Migration 009: Deployment Checks / Pre-Deployment Enforcement (G4)
-- Schema: compliance
-- Run: ssh macmini "docker exec -i bp-core-postgres psql -U breakpilot -d breakpilot_db" < control-pipeline/migrations/009_deployment_checks.sql
SET search_path TO compliance, public;
CREATE TABLE IF NOT EXISTS deployment_checks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
-- Deploy Info
commit_hash VARCHAR(64) NOT NULL,
branch VARCHAR(200),
environment VARCHAR(50) DEFAULT 'production',
-- Result
verdict VARCHAR(20) NOT NULL DEFAULT 'pending'
CHECK (verdict IN ('pending', 'approved', 'blocked', 'override')),
-- Impact
affected_control_ids JSONB DEFAULT '[]',
blocking_controls JSONB DEFAULT '[]',
warning_controls JSONB DEFAULT '[]',
risk_score NUMERIC(5,2) DEFAULT 0.0,
-- Override
override_by VARCHAR(200),
override_reason TEXT,
summary TEXT,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_dc_tenant ON deployment_checks(tenant_id);
CREATE INDEX IF NOT EXISTS idx_dc_hash ON deployment_checks(commit_hash);
CREATE INDEX IF NOT EXISTS idx_dc_verdict ON deployment_checks(verdict);
CREATE INDEX IF NOT EXISTS idx_dc_created ON deployment_checks(created_at);
@@ -0,0 +1,267 @@
#!/usr/bin/env python3
"""
F4: LLM-based Synonym Enrichment for Action Types and Object Tokens.
Uses Ollama (qwen3.5:35b-a3b) to generate additional German synonyms
for each canonical action type and object token. Results are stored
with source='llm' in the DB.
Usage:
# Dry run (print, no DB write):
python3 scripts/f4_llm_enrich_synonyms.py --dry-run
# Against Mac Mini:
python3 scripts/f4_llm_enrich_synonyms.py --db-host macmini --ollama-host macmini
# Only actions or only objects:
python3 scripts/f4_llm_enrich_synonyms.py --actions-only
python3 scripts/f4_llm_enrich_synonyms.py --objects-only
"""
import argparse
import json
import logging
import sys
import time
from pathlib import Path
import httpx
from sqlalchemy import create_engine, text
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("f4-enrich")
OLLAMA_MODEL = "qwen3.5:35b-a3b"
def call_ollama(prompt: str, ollama_url: str) -> str:
"""Call Ollama with think:false for direct answers."""
resp = httpx.post(
f"{ollama_url}/api/chat",
json={
"model": OLLAMA_MODEL,
"messages": [{"role": "user", "content": prompt}],
"stream": False,
"options": {"temperature": 0.3},
"think": False,
},
timeout=60.0,
)
resp.raise_for_status()
return resp.json().get("message", {}).get("content", "")
def enrich_action_types(db_url: str, ollama_url: str, dry_run: bool) -> dict:
"""Generate synonyms for each action type."""
engine = create_engine(db_url, connect_args={"options": "-c search_path=compliance,public"})
with engine.connect() as conn:
# Get existing action types and their current synonyms
types = conn.execute(text("SELECT canonical_name, phase FROM action_types")).fetchall()
existing = {}
for row in conn.execute(text("SELECT canonical_action, synonym FROM action_synonyms")).fetchall():
existing.setdefault(row[0], set()).add(row[1])
stats = {"types_processed": 0, "new_synonyms": 0, "skipped": 0}
all_new: list[dict] = []
for canonical, phase in types:
current_synonyms = existing.get(canonical, set())
prompt = f"""Du bist ein Compliance-Experte. Gib mir 5-8 deutsche Synonyme oder Umschreibungen fuer die Handlung "{canonical}" (Phase: {phase}) im Kontext von IT-Compliance und Datenschutz.
Bestehende Synonyme (NICHT wiederholen): {', '.join(sorted(current_synonyms)[:10])}
Antworte NUR mit einer JSON-Liste von Strings, z.B.: ["synonym1", "synonym2", ...]
Keine Erklaerungen, nur die JSON-Liste."""
try:
response = call_ollama(prompt, ollama_url)
# Parse JSON from response
synonyms = _parse_json_list(response)
new_count = 0
for syn in synonyms:
syn_lower = syn.lower().strip()
if not syn_lower or len(syn_lower) < 3:
continue
if syn_lower in current_synonyms:
stats["skipped"] += 1
continue
all_new.append({
"canonical_action": canonical,
"synonym": syn_lower,
"language": "de",
"source": "llm",
"pattern_type": "alias",
})
current_synonyms.add(syn_lower)
new_count += 1
stats["types_processed"] += 1
stats["new_synonyms"] += new_count
logger.info("%s: +%d new synonyms", canonical, new_count)
except Exception as e:
logger.warning("Error for %s: %s", canonical, e)
time.sleep(1) # Rate limit
# Write to DB
if not dry_run and all_new:
with engine.begin() as conn:
for row in all_new:
conn.execute(
text("""
INSERT INTO action_synonyms (canonical_action, synonym, language, source, pattern_type)
VALUES (:canonical_action, :synonym, :language, :source, :pattern_type)
ON CONFLICT (synonym, language, pattern_type) DO NOTHING
"""),
row,
)
logger.info("Wrote %d new action synonyms to DB", len(all_new))
elif dry_run:
print("\n--- DRY RUN: Action Synonyms ---")
for row in all_new[:20]:
print(" %s%s" % (row["canonical_action"], row["synonym"]))
if len(all_new) > 20:
print(" ... and %d more" % (len(all_new) - 20))
return stats
def enrich_object_tokens(db_url: str, ollama_url: str, dry_run: bool) -> dict:
"""Generate synonyms for each object canonical token."""
engine = create_engine(db_url, connect_args={"options": "-c search_path=compliance,public"})
with engine.connect() as conn:
# Get unique canonical tokens
tokens = conn.execute(text(
"SELECT DISTINCT canonical_token FROM object_synonyms ORDER BY canonical_token"
)).fetchall()
existing = {}
for row in conn.execute(text("SELECT canonical_token, synonym FROM object_synonyms")).fetchall():
existing.setdefault(row[0], set()).add(row[1])
stats = {"tokens_processed": 0, "new_synonyms": 0, "skipped": 0}
all_new: list[dict] = []
for (token,) in tokens:
current_synonyms = existing.get(token, set())
prompt = f"""Du bist ein IT-Security-Experte. Gib mir 5-8 deutsche und englische Begriffe/Synonyme fuer das Konzept "{token}" im Kontext von IT-Sicherheit und Compliance.
Bestehende Synonyme (NICHT wiederholen): {', '.join(sorted(current_synonyms)[:8])}
Antworte NUR mit einer JSON-Liste von Strings, z.B.: ["synonym1", "synonym2", ...]
Keine Erklaerungen, nur die JSON-Liste."""
try:
response = call_ollama(prompt, ollama_url)
synonyms = _parse_json_list(response)
new_count = 0
for syn in synonyms:
syn_lower = syn.lower().strip()
if not syn_lower or len(syn_lower) < 2:
continue
if syn_lower in current_synonyms:
stats["skipped"] += 1
continue
# Detect language
lang = "de"
if all(c in "abcdefghijklmnopqrstuvwxyz0123456789 -_" for c in syn_lower):
lang = "en"
all_new.append({
"canonical_token": token,
"synonym": syn_lower,
"language": lang,
"source": "llm",
})
current_synonyms.add(syn_lower)
new_count += 1
stats["tokens_processed"] += 1
stats["new_synonyms"] += new_count
logger.info("%s: +%d new synonyms", token, new_count)
except Exception as e:
logger.warning("Error for %s: %s", token, e)
time.sleep(1)
# Write to DB
if not dry_run and all_new:
with engine.begin() as conn:
for row in all_new:
conn.execute(
text("""
INSERT INTO object_synonyms (canonical_token, synonym, language, source)
VALUES (:canonical_token, :synonym, :language, :source)
ON CONFLICT (synonym, language) DO NOTHING
"""),
row,
)
logger.info("Wrote %d new object synonyms to DB", len(all_new))
elif dry_run:
print("\n--- DRY RUN: Object Synonyms ---")
for row in all_new[:20]:
print(" %s%s (%s)" % (row["canonical_token"], row["synonym"], row["language"]))
if len(all_new) > 20:
print(" ... and %d more" % (len(all_new) - 20))
return stats
def _parse_json_list(text: str) -> list[str]:
"""Extract JSON list from LLM response."""
# Try to find JSON array in response
text = text.strip()
# Remove markdown code fences
if "```" in text:
text = text.split("```")[1] if text.count("```") >= 2 else text
text = text.strip()
if text.startswith("json"):
text = text[4:].strip()
# Find first [ and last ]
start = text.find("[")
end = text.rfind("]")
if start >= 0 and end > start:
try:
return json.loads(text[start:end + 1])
except json.JSONDecodeError:
pass
return []
def main():
parser = argparse.ArgumentParser(description="LLM Synonym Enrichment")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--db-host", default="localhost")
parser.add_argument("--ollama-host", default="localhost")
parser.add_argument("--actions-only", action="store_true")
parser.add_argument("--objects-only", action="store_true")
args = parser.parse_args()
db_url = f"postgresql://breakpilot:breakpilot123@{args.db_host}:5432/breakpilot_db"
ollama_url = f"http://{args.ollama_host}:11434"
if args.dry_run:
print("=== DRY RUN MODE ===\n")
if not args.objects_only:
print("=== Enriching Action Types ===")
action_stats = enrich_action_types(db_url, ollama_url, args.dry_run)
print("Actions: %d processed, %d new synonyms\n" % (
action_stats["types_processed"], action_stats["new_synonyms"]))
if not args.actions_only:
print("=== Enriching Object Tokens ===")
object_stats = enrich_object_tokens(db_url, ollama_url, args.dry_run)
print("Objects: %d processed, %d new synonyms\n" % (
object_stats["tokens_processed"], object_stats["new_synonyms"]))
if __name__ == "__main__":
main()
+37
View File
@@ -0,0 +1,37 @@
#!/usr/bin/env python3
"""G-pre1: Analyze unique objects and test normalization reduction."""
from collections import Counter
from sqlalchemy import create_engine, text
engine = create_engine(
"postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db",
connect_args={"options": "-c search_path=compliance,public"},
)
with engine.connect() as c:
rows = c.execute(text("""
SELECT DISTINCT
split_part(generation_metadata->>'merge_group_hint', ':', 2) AS obj
FROM canonical_controls
WHERE generation_metadata->>'merge_group_hint' IS NOT NULL
AND generation_metadata->>'merge_group_hint' != ''
""")).fetchall()
objects = [r[0] for r in rows if r[0] and r[0].strip()]
print("Unique raw objects: %d" % len(objects))
from services.control_dedup import normalize_object
norm_counts: Counter = Counter()
for obj in objects:
norm_counts[normalize_object(obj)] += 1
print("After normalize_object(): %d unique" % len(norm_counts))
print("Reduction: %.1f%%" % ((1 - len(norm_counts) / len(objects)) * 100))
print()
print("Top 20 normalized objects:")
for token, count in norm_counts.most_common(20):
print(" %5d %s" % (count, token))
print()
print("Singletons (only 1 raw object): %d" % sum(1 for c in norm_counts.values() if c == 1))
print("Groups with 2+ members: %d" % sum(1 for c in norm_counts.values() if c >= 2))
@@ -0,0 +1,219 @@
#!/usr/bin/env python3
"""
G-pre1: Object Clustering via Mini-Batch K-Means on Embeddings.
Clusters ~144k unique normalized objects into ~15-25k semantic groups
using bge-m3 embeddings and Mini-Batch K-Means.
Usage (inside control-pipeline container):
python3 /app/scripts/gpre1_object_clustering.py --k 20000
python3 /app/scripts/gpre1_object_clustering.py --k 20000 --dry-run
"""
import argparse
import json
import logging
import sys
import time
from collections import Counter
import httpx
import numpy as np
from sklearn.cluster import MiniBatchKMeans
from sqlalchemy import create_engine, text
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("gpre1")
import os
DB_URL = os.getenv("DATABASE_URL", "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db")
EMBEDDING_URL = "http://embedding-service:8087"
BATCH_SIZE = 64 # Embeddings per API call
def extract_objects(engine) -> tuple[list[str], dict[str, int]]:
"""Extract unique normalized objects and their frequencies."""
from services.control_dedup import normalize_object
logger.info("Extracting objects from canonical_controls...")
with engine.connect() as c:
rows = c.execute(text("""
SELECT split_part(generation_metadata->>'merge_group_hint', ':', 2) AS obj,
count(*) AS freq
FROM canonical_controls
WHERE generation_metadata->>'merge_group_hint' IS NOT NULL
AND generation_metadata->>'merge_group_hint' != ''
GROUP BY 1
""")).fetchall()
# Normalize and aggregate
norm_freq: Counter = Counter()
norm_to_raw: dict[str, list[str]] = {}
for raw_obj, freq in rows:
if not raw_obj or not raw_obj.strip():
continue
normed = normalize_object(raw_obj)
norm_freq[normed] += freq
norm_to_raw.setdefault(normed, []).append(raw_obj)
objects = list(norm_freq.keys())
freqs = {obj: norm_freq[obj] for obj in objects}
logger.info("Extracted %d unique normalized objects (from %d raw)", len(objects), len(rows))
return objects, freqs
def generate_embeddings(objects: list[str]) -> np.ndarray:
"""Generate embeddings via embedding-service in batches.
Uses pre-allocated numpy array to avoid Python list memory overhead
(Python float = 28 bytes vs numpy float32 = 4 bytes).
"""
total = len(objects)
# Pre-allocate: 144k × 1024 × 4 bytes = ~590 MB (vs ~4 GB with Python lists)
result = np.zeros((total, 1024), dtype=np.float32)
logger.info("Generating embeddings for %d objects (pre-allocated %.0f MB)...",
total, result.nbytes / 1024 / 1024)
failed_batches = []
for i in range(0, total, BATCH_SIZE):
batch = objects[i:i + BATCH_SIZE]
success = False
for attempt in range(3): # Max 3 retries per batch
try:
with httpx.Client(timeout=httpx.Timeout(60.0, connect=10.0)) as client:
resp = client.post(
f"{EMBEDDING_URL}/embed",
json={"texts": batch},
)
resp.raise_for_status()
embeddings = resp.json().get("embeddings", [])
end = min(i + len(embeddings), total)
result[i:end] = np.array(embeddings, dtype=np.float32)
success = True
break
except Exception as e:
if attempt < 2:
logger.warning("Batch %d attempt %d failed: %s — retrying", i, attempt + 1, e)
import time
time.sleep(2)
else:
logger.error("Batch %d failed after 3 attempts: %s", i, e)
failed_batches.append(i)
if (i + BATCH_SIZE) % 5000 == 0 or i + BATCH_SIZE >= total:
logger.info(" Embedded %d/%d (%.1f%%) [%d failed]",
min(i + BATCH_SIZE, total), total,
min(i + BATCH_SIZE, total) / total * 100,
len(failed_batches))
return result
def cluster_objects(embeddings: np.ndarray, k: int) -> np.ndarray:
"""Run Mini-Batch K-Means clustering."""
logger.info("Clustering %d objects into %d groups (Mini-Batch K-Means)...", len(embeddings), k)
# Normalize embeddings for cosine-like clustering
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
norms[norms == 0] = 1
normalized = embeddings / norms
kmeans = MiniBatchKMeans(
n_clusters=k,
batch_size=1000,
max_iter=100,
random_state=42,
verbose=0,
)
labels = kmeans.fit_predict(normalized)
logger.info("Clustering done. Inertia: %.2f", kmeans.inertia_)
return labels
def store_results(engine, objects: list[str], freqs: dict[str, int],
labels: np.ndarray, dry_run: bool):
"""Store clustering results in object_groups table."""
# Build groups
groups: dict[int, list[tuple[str, int]]] = {}
for i, obj in enumerate(objects):
gid = int(labels[i])
groups.setdefault(gid, []).append((obj, freqs.get(obj, 0)))
# Pick canonical name (highest frequency in group)
results = []
for gid, members in groups.items():
members_sorted = sorted(members, key=lambda x: -x[1])
canonical = members_sorted[0][0]
results.append({
"group_id": gid,
"canonical_name": canonical,
"member_count": len(members),
"members": json.dumps([m[0] for m in members_sorted]),
"top_controls_count": members_sorted[0][1],
})
# Stats
sizes = [r["member_count"] for r in results]
logger.info("Groups: %d total", len(results))
logger.info(" Singletons: %d", sum(1 for s in sizes if s == 1))
logger.info(" Groups 2-5: %d", sum(1 for s in sizes if 2 <= s <= 5))
logger.info(" Groups 6-20: %d", sum(1 for s in sizes if 6 <= s <= 20))
logger.info(" Groups 21-100: %d", sum(1 for s in sizes if 21 <= s <= 100))
logger.info(" Groups >100: %d", sum(1 for s in sizes if s > 100))
logger.info(" Max group size: %d", max(sizes))
logger.info(" Avg group size: %.1f", sum(sizes) / len(sizes))
# Top 10 largest groups
top10 = sorted(results, key=lambda x: -x["member_count"])[:10]
logger.info("\nTop 10 largest groups:")
for g in top10:
members_list = json.loads(g["members"])
logger.info(" [%d] %s (%d members): %s",
g["group_id"], g["canonical_name"], g["member_count"],
", ".join(members_list[:5]))
if dry_run:
logger.info("DRY RUN — not writing to DB")
return
# Write to DB
with engine.begin() as conn:
conn.execute(text("SET search_path TO compliance, public"))
conn.execute(text("DELETE FROM object_groups")) # Clear old results
for r in results:
conn.execute(text("""
INSERT INTO object_groups (group_id, canonical_name, member_count, members, top_controls_count)
VALUES (:group_id, :canonical_name, :member_count, CAST(:members AS jsonb), :top_controls_count)
"""), r)
logger.info("Wrote %d groups to object_groups table", len(results))
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--k", type=int, default=20000, help="Number of clusters")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
engine = create_engine(DB_URL, connect_args={"options": "-c search_path=compliance,public"})
# Step 1: Extract
objects, freqs = extract_objects(engine)
# Step 2: Embed
embeddings = generate_embeddings(objects)
logger.info("Embedding matrix: %s (%.1f MB)", embeddings.shape,
embeddings.nbytes / 1024 / 1024)
# Adjust k if we have fewer objects
k = min(args.k, len(objects) // 2)
logger.info("Using k=%d (requested %d, objects=%d)", k, args.k, len(objects))
# Step 3: Cluster
labels = cluster_objects(embeddings, k)
# Step 4: Store
store_results(engine, objects, freqs, labels, args.dry_run)
if __name__ == "__main__":
main()
@@ -0,0 +1,254 @@
#!/usr/bin/env python3
"""
G-pre1 Refinement: Re-cluster large object groups (>200 members in master_controls)
with k=10 sub-clusters for finer granularity.
Replaces the large master controls with smaller, more specific ones.
"""
import json
import logging
import os
import httpx
import numpy as np
from sklearn.cluster import MiniBatchKMeans
from sqlalchemy import create_engine, text
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("gpre1-refine")
DB_URL = os.getenv("DATABASE_URL", "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db")
EMBEDDING_URL = "http://embedding-service:8087"
def main():
engine = create_engine(DB_URL, connect_args={"options": "-c search_path=compliance,public"})
# Step 1: Find large master controls and their object_group_ids
with engine.connect() as c:
large_mcs = c.execute(text("""
SELECT mc.master_control_id, mc.object_group_id, mc.canonical_name, mc.total_controls,
og.members, og.member_count
FROM master_controls mc
JOIN object_groups og ON og.group_id = mc.object_group_id
WHERE mc.total_controls > 200
ORDER BY mc.total_controls DESC
""")).fetchall()
logger.info("Found %d large master controls to refine", len(large_mcs))
# Step 2: For each large group, re-cluster the object members
with engine.connect() as c:
max_gid = c.execute(text("SELECT COALESCE(MAX(group_id), 0) FROM object_groups")).scalar()
next_gid = max_gid + 1
groups_to_delete = []
new_groups = []
total_sub = 0
for mc_id, og_id, canonical, total, members_json, member_count in large_mcs:
members = json.loads(members_json) if isinstance(members_json, str) else members_json
if len(members) < 20:
logger.info(" Skip %s (%d members) — too few to split", canonical, len(members))
continue
# Determine k based on group size
k = max(4, min(len(members) // 15, 20)) # 4-20 sub-clusters
# Embed members
embeddings = _embed_texts(members)
if embeddings is None:
logger.error(" Failed to embed %s", canonical)
continue
# Normalize + cluster
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
norms[norms == 0] = 1
normalized = embeddings / norms
kmeans = MiniBatchKMeans(n_clusters=k, batch_size=min(100, len(members)),
max_iter=50, random_state=42)
labels = kmeans.fit_predict(normalized)
# Build sub-groups
subs: dict[int, list[str]] = {}
for i, member in enumerate(members):
subs.setdefault(int(labels[i]), []).append(member)
for sub_members in subs.values():
new_groups.append({
"group_id": next_gid,
"canonical_name": sub_members[0],
"member_count": len(sub_members),
"members": json.dumps(sub_members),
"top_controls_count": 0,
})
next_gid += 1
total_sub += 1
groups_to_delete.append(og_id)
logger.info(" %s (%s, %d members) → %d sub-groups (k=%d)",
mc_id, canonical, len(members), len(subs), k)
logger.info("Refinement: %d groups → %d sub-groups", len(groups_to_delete), total_sub)
# Step 3: Update DB — replace old object_groups, delete old master_controls
with engine.begin() as c:
c.execute(text("SET search_path TO compliance, public"))
# Delete old master controls and their members for affected groups
for og_id in groups_to_delete:
c.execute(text("""
DELETE FROM master_control_members
WHERE master_control_uuid IN (
SELECT id FROM master_controls WHERE object_group_id = :gid
)
"""), {"gid": og_id})
c.execute(text("DELETE FROM master_controls WHERE object_group_id = :gid"), {"gid": og_id})
c.execute(text("DELETE FROM object_groups WHERE group_id = :gid"), {"gid": og_id})
# Insert new sub-groups
for g in new_groups:
c.execute(text("""
INSERT INTO object_groups (group_id, canonical_name, member_count, members, top_controls_count)
VALUES (:group_id, :canonical_name, :member_count, CAST(:members AS jsonb), :top_controls_count)
"""), g)
logger.info("DB updated: %d old groups deleted, %d new groups inserted", len(groups_to_delete), len(new_groups))
# Step 4: Re-run master control generation for affected groups
logger.info("Re-generating master controls for new sub-groups...")
_regenerate_master_controls(engine, [g["group_id"] for g in new_groups])
# Final stats
with engine.connect() as c:
mc_count = c.execute(text("SELECT count(*) FROM master_controls")).scalar()
og_count = c.execute(text("SELECT count(*) FROM object_groups")).scalar()
large = c.execute(text("SELECT count(*) FROM master_controls WHERE total_controls > 200")).scalar()
logger.info("Final: %d master controls, %d object groups, %d still >200", mc_count, og_count, large)
def _regenerate_master_controls(engine, group_ids: list[int]):
"""Re-create master controls for specific object_group_ids."""
from collections import defaultdict
from services.control_dedup import normalize_object
# Build reverse index for new groups only
object_to_group = {}
with engine.connect() as c:
for gid in group_ids:
row = c.execute(text(
"SELECT group_id, canonical_name, members FROM object_groups WHERE group_id = :gid"
), {"gid": gid}).fetchone()
if row:
members = json.loads(row[2]) if isinstance(row[2], str) else row[2]
for m in members:
object_to_group[m] = (row[0], row[1])
# Load controls for these objects
with engine.connect() as c:
rows = c.execute(text("""
SELECT id, control_id, generation_metadata->>'merge_group_hint' AS hint
FROM canonical_controls
WHERE generation_metadata->>'merge_group_hint' IS NOT NULL
AND release_state NOT IN ('deprecated', 'rejected')
""")).fetchall()
group_phases: dict[int, dict[str, list]] = defaultdict(lambda: defaultdict(list))
group_names: dict[int, str] = {}
for uuid, control_id, hint in rows:
parts = hint.split(":", 2)
if len(parts) < 2:
continue
action, obj = parts[0], parts[1]
phase = parts[2] if len(parts) > 2 else "implementation"
normed = normalize_object(obj)
if normed in object_to_group:
gid, canonical = object_to_group[normed]
elif obj in object_to_group:
gid, canonical = object_to_group[obj]
else:
continue
group_phases[gid][phase].append((str(uuid), control_id, action))
group_names[gid] = canonical
# Create master controls
mc_count = 0
mem_count = 0
with engine.begin() as c:
c.execute(text("SET search_path TO compliance, public"))
for gid, phases in group_phases.items():
if len(phases) < 2:
continue
mc_id = "MC-%d" % gid
canonical = group_names.get(gid, "unknown")
sorted_phases = sorted(phases.keys())
phase_counts = {p: len(ctrls) for p, ctrls in phases.items()}
total = sum(phase_counts.values())
c.execute(text("""
INSERT INTO master_controls
(master_control_id, object_group_id, canonical_name,
phases_covered, phase_control_count, total_controls)
VALUES (:mcid, :gid, :name,
CAST(:phases AS jsonb), CAST(:pcounts AS jsonb), :total)
"""), {
"mcid": mc_id, "gid": gid, "name": canonical,
"phases": json.dumps(sorted_phases),
"pcounts": json.dumps(phase_counts),
"total": total,
})
mc_uuid = c.execute(text(
"SELECT id FROM master_controls WHERE master_control_id = :mcid"
), {"mcid": mc_id}).scalar()
for phase, controls in phases.items():
for ctrl_uuid, ctrl_id, action in controls:
c.execute(text("""
INSERT INTO master_control_members
(master_control_uuid, control_uuid, phase, action)
VALUES (CAST(:mc AS uuid), CAST(:ctrl AS uuid), :phase, :action)
"""), {"mc": str(mc_uuid), "ctrl": ctrl_uuid, "phase": phase, "action": action})
mem_count += 1
mc_count += 1
logger.info("Created %d new master controls with %d members", mc_count, mem_count)
def _embed_texts(texts: list[str]) -> np.ndarray | None:
"""Embed texts with retry logic."""
try:
result = np.zeros((len(texts), 1024), dtype=np.float32)
batch_size = 64
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
for attempt in range(3):
try:
with httpx.Client(timeout=httpx.Timeout(60.0, connect=10.0)) as client:
resp = client.post(f"{EMBEDDING_URL}/embed", json={"texts": batch})
resp.raise_for_status()
embs = resp.json().get("embeddings", [])
end = min(i + len(embs), len(texts))
result[i:end] = np.array(embs, dtype=np.float32)
break
except Exception as e:
if attempt == 2:
logger.error("Embed batch %d failed: %s", i, e)
import time
time.sleep(2)
return result
except Exception as e:
logger.error("Embedding failed: %s", e)
return None
if __name__ == "__main__":
main()
@@ -0,0 +1,164 @@
#!/usr/bin/env python3
"""
G-pre1 Step 2: Sub-cluster large object groups (>50 members) into k=4 sub-groups.
Reads existing object_groups, re-embeds members of large groups,
applies K-Means with k=4 per group, and writes sub-groups back.
Usage (inside container or with PYTHONPATH):
python3 /app/scripts/gpre1_subcluster.py
python3 /app/scripts/gpre1_subcluster.py --min-size 100 # only groups >100
python3 /app/scripts/gpre1_subcluster.py --sub-k 6 # 6 sub-clusters
"""
import argparse
import json
import logging
import os
import httpx
import numpy as np
from sklearn.cluster import MiniBatchKMeans
from sqlalchemy import create_engine, text
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("gpre1-sub")
DB_URL = os.getenv("DATABASE_URL", "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db")
EMBEDDING_URL = "http://embedding-service:8087"
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--min-size", type=int, default=50, help="Min group size to sub-cluster")
parser.add_argument("--sub-k", type=int, default=4, help="Sub-clusters per group")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
engine = create_engine(DB_URL, connect_args={"options": "-c search_path=compliance,public"})
# Load large groups
with engine.connect() as c:
groups = c.execute(text(
"SELECT group_id, canonical_name, member_count, members "
"FROM object_groups WHERE member_count > :min ORDER BY member_count DESC"
), {"min": args.min_size}).fetchall()
logger.info("Found %d groups with >%d members to sub-cluster", len(groups), args.min_size)
# Find next available group_id
with engine.connect() as c:
max_gid = c.execute(text("SELECT COALESCE(MAX(group_id), 0) FROM object_groups")).scalar()
next_gid = max_gid + 1
total_sub_groups = 0
all_new_rows = []
groups_to_delete = []
for group_id, canonical_name, member_count, members_json in groups:
members = json.loads(members_json) if isinstance(members_json, str) else members_json
if len(members) < args.sub_k * 2:
logger.info(" Skip group %d (%s, %d members) — too small for k=%d",
group_id, canonical_name, len(members), args.sub_k)
continue
# Embed members
embeddings = _embed_batch(members)
if embeddings is None:
logger.error(" Failed to embed group %d (%s)", group_id, canonical_name)
continue
# Normalize for cosine
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
norms[norms == 0] = 1
normalized = embeddings / norms
# Sub-cluster
k = min(args.sub_k, len(members) // 2)
kmeans = MiniBatchKMeans(n_clusters=k, batch_size=min(100, len(members)),
max_iter=50, random_state=42)
labels = kmeans.fit_predict(normalized)
# Build sub-groups
sub_groups: dict[int, list[str]] = {}
for i, member in enumerate(members):
sub_groups.setdefault(int(labels[i]), []).append(member)
# Create new rows
for sub_id, sub_members in sub_groups.items():
sub_canonical = sub_members[0] # Most frequent would be better but we don't have freq here
all_new_rows.append({
"group_id": next_gid,
"canonical_name": sub_canonical,
"member_count": len(sub_members),
"members": json.dumps(sub_members),
"top_controls_count": 0,
"parent_group_id": group_id,
})
next_gid += 1
groups_to_delete.append(group_id)
total_sub_groups += len(sub_groups)
if len(groups_to_delete) % 50 == 0:
logger.info(" Processed %d/%d groups, %d sub-groups created",
len(groups_to_delete), len(groups), total_sub_groups)
logger.info("Sub-clustering complete: %d groups → %d sub-groups",
len(groups_to_delete), total_sub_groups)
# Stats
sub_sizes = [r["member_count"] for r in all_new_rows]
if sub_sizes:
logger.info(" Sub-group sizes: avg=%.1f, max=%d, min=%d",
sum(sub_sizes) / len(sub_sizes), max(sub_sizes), min(sub_sizes))
if args.dry_run:
logger.info("DRY RUN — not writing to DB")
for r in all_new_rows[:10]:
logger.info(" [%d] %s (%d members)", r["group_id"], r["canonical_name"], r["member_count"])
return
# Write to DB: delete old large groups, insert sub-groups
with engine.begin() as c:
c.execute(text("SET search_path TO compliance, public"))
# Delete old large groups
for gid in groups_to_delete:
c.execute(text("DELETE FROM object_groups WHERE group_id = :gid"), {"gid": gid})
# Insert sub-groups
for r in all_new_rows:
c.execute(text("""
INSERT INTO object_groups (group_id, canonical_name, member_count, members, top_controls_count)
VALUES (:group_id, :canonical_name, :member_count, CAST(:members AS jsonb), :top_controls_count)
"""), r)
logger.info("Wrote %d sub-groups to DB (replaced %d large groups)", len(all_new_rows), len(groups_to_delete))
# Final stats
with engine.connect() as c:
total = c.execute(text("SELECT count(*) FROM object_groups")).scalar()
logger.info("Total groups in DB: %d", total)
def _embed_batch(texts: list[str]) -> np.ndarray | None:
"""Embed a list of texts, return numpy array."""
try:
all_emb = np.zeros((len(texts), 1024), dtype=np.float32)
batch_size = 64
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
with httpx.Client(timeout=httpx.Timeout(60.0, connect=10.0)) as client:
resp = client.post(f"{EMBEDDING_URL}/embed", json={"texts": batch})
resp.raise_for_status()
embs = resp.json().get("embeddings", [])
end = min(i + len(embs), len(texts))
all_emb[i:end] = np.array(embs, dtype=np.float32)
return all_emb
except Exception as e:
logger.error("Embedding failed: %s", e)
return None
if __name__ == "__main__":
main()
@@ -0,0 +1,213 @@
#!/usr/bin/env python3
"""
G-pre2: Build Master Controls from Object Groups + Lifecycle Phases.
Groups atomic controls by (object_group_id, phase) and creates
Master Controls for groups with >=2 distinct phases.
Usage:
python3 /app/scripts/gpre2_master_controls.py
python3 /app/scripts/gpre2_master_controls.py --min-phases 3
python3 /app/scripts/gpre2_master_controls.py --dry-run
"""
import argparse
import json
import logging
import os
from collections import defaultdict
from sqlalchemy import create_engine, text
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("gpre2")
DB_URL = os.getenv("DATABASE_URL", "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db")
# Canonical phase ordering for lifecycle chains
PHASE_ORDER = {
"scope": 0,
"definition": 1, "governance": 1,
"design": 2,
"implementation": 3, "configuration": 3,
"operation": 4, "training": 4,
"monitoring": 5,
"testing": 6,
"review": 7,
"assessment": 8, "remediation": 8,
"validation": 9,
"reporting": 10,
"evidence": 11,
}
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--min-phases", type=int, default=2, help="Min distinct phases for Master Control")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
engine = create_engine(DB_URL, connect_args={"options": "-c search_path=compliance,public"})
# Step 1: Build reverse index (object_token → group_id)
logger.info("Building object → group_id reverse index...")
object_to_group = {}
with engine.connect() as c:
groups = c.execute(text("SELECT group_id, canonical_name, members FROM object_groups")).fetchall()
for gid, canonical, members_json in groups:
members = json.loads(members_json) if isinstance(members_json, str) else members_json
for member in members:
object_to_group[member] = (gid, canonical)
logger.info("Reverse index: %d objects → %d groups", len(object_to_group), len(groups))
# Step 2: Load all controls with merge_group_hint
logger.info("Loading controls with merge_group_hint...")
with engine.connect() as c:
rows = c.execute(text("""
SELECT id, control_id,
generation_metadata->>'merge_group_hint' AS hint,
title
FROM canonical_controls
WHERE generation_metadata->>'merge_group_hint' IS NOT NULL
AND generation_metadata->>'merge_group_hint' != ''
AND release_state NOT IN ('deprecated', 'rejected')
""")).fetchall()
logger.info("Loaded %d controls with merge_group_hint", len(rows))
# Step 3: Parse and group by (group_id, phase)
# Structure: group_id → {phase → [(control_uuid, control_id, action, title)]}
group_phases: dict[int, dict[str, list]] = defaultdict(lambda: defaultdict(list))
group_names: dict[int, str] = {}
unmatched = 0
for uuid, control_id, hint, title in rows:
parts = hint.split(":", 2)
if len(parts) < 2:
continue
action = parts[0]
obj = parts[1]
phase = parts[2] if len(parts) > 2 else "implementation"
# Normalize object and find group
from services.control_dedup import normalize_object
normed = normalize_object(obj)
if normed in object_to_group:
gid, canonical = object_to_group[normed]
elif obj in object_to_group:
gid, canonical = object_to_group[obj]
else:
unmatched += 1
continue
group_phases[gid][phase].append((str(uuid), control_id, action, title))
group_names[gid] = canonical
logger.info("Grouped into %d object groups (%d controls unmatched to any group)",
len(group_phases), unmatched)
# Step 4: Create Master Controls (groups with >= min_phases distinct phases)
master_controls = []
master_members = []
mc_counter = 0
for gid, phases in group_phases.items():
if len(phases) < args.min_phases:
continue
mc_counter += 1
mc_id = "MC-%d" % gid
canonical = group_names.get(gid, "unknown")
# Sort phases by lifecycle order
sorted_phases = sorted(phases.keys(), key=lambda p: PHASE_ORDER.get(p, 99))
phase_counts = {p: len(ctrls) for p, ctrls in phases.items()}
total = sum(phase_counts.values())
master_controls.append({
"master_control_id": mc_id,
"object_group_id": gid,
"canonical_name": canonical,
"phases_covered": json.dumps(sorted_phases),
"phase_control_count": json.dumps(phase_counts),
"total_controls": total,
})
for phase, controls in phases.items():
for ctrl_uuid, ctrl_id, action, title in controls:
master_members.append({
"mc_id": mc_id,
"control_uuid": ctrl_uuid,
"phase": phase,
"action": action,
})
logger.info("Created %d Master Controls with %d members (min %d phases)",
len(master_controls), len(master_members), args.min_phases)
# Stats
if master_controls:
phase_counts = [mc["total_controls"] for mc in master_controls]
phases_per_mc = [len(json.loads(mc["phases_covered"])) for mc in master_controls]
logger.info(" Avg controls per MC: %.1f", sum(phase_counts) / len(phase_counts))
logger.info(" Avg phases per MC: %.1f", sum(phases_per_mc) / len(phases_per_mc))
logger.info(" Max controls in MC: %d", max(phase_counts))
logger.info(" Max phases in MC: %d", max(phases_per_mc))
# Top 10
top10 = sorted(master_controls, key=lambda x: -x["total_controls"])[:10]
logger.info("\nTop 10 Master Controls:")
for mc in top10:
logger.info(" %s: %s (%d controls, phases: %s)",
mc["master_control_id"], mc["canonical_name"],
mc["total_controls"], mc["phases_covered"])
if args.dry_run:
logger.info("DRY RUN — not writing to DB")
return
# Step 5: Write to DB
with engine.begin() as c:
c.execute(text("SET search_path TO compliance, public"))
c.execute(text("DELETE FROM master_control_members"))
c.execute(text("DELETE FROM master_controls"))
for mc in master_controls:
c.execute(text("""
INSERT INTO master_controls
(master_control_id, object_group_id, canonical_name,
phases_covered, phase_control_count, total_controls)
VALUES (:master_control_id, :object_group_id, :canonical_name,
CAST(:phases_covered AS jsonb), CAST(:phase_control_count AS jsonb),
:total_controls)
"""), mc)
# Get MC UUIDs for member inserts
mc_uuids = {}
for row in c.execute(text("SELECT id, master_control_id FROM master_controls")).fetchall():
mc_uuids[row[1]] = str(row[0])
for mem in master_members:
mc_uuid = mc_uuids.get(mem["mc_id"])
if not mc_uuid:
continue
c.execute(text("""
INSERT INTO master_control_members
(master_control_uuid, control_uuid, phase, action)
VALUES (CAST(:mc_uuid AS uuid), CAST(:control_uuid AS uuid), :phase, :action)
"""), {
"mc_uuid": mc_uuid,
"control_uuid": mem["control_uuid"],
"phase": mem["phase"],
"action": mem["action"],
})
logger.info("Wrote %d Master Controls + %d members to DB",
len(master_controls), len(master_members))
if __name__ == "__main__":
main()
@@ -0,0 +1,122 @@
"""F5 Validation: Verify DB-backed lookups match old hardcoded dicts."""
import pytest
class TestRegulationRegistryConsistency:
"""Ensure all old REGULATION_LICENSE_MAP entries are in the DB."""
def test_all_old_entries_in_db(self):
from services.control_generator import REGULATION_LICENSE_MAP
from scripts.f1_migrate_regulation_registry import build_rows
db_ids = {r["regulation_id"] for r in build_rows()}
for reg_id in REGULATION_LICENSE_MAP:
assert reg_id in db_ids, f"Missing from DB: {reg_id}"
def test_classify_regulation_matches_old(self):
"""DB-backed classify_regulation returns same rule as old dict."""
from services.control_generator import REGULATION_LICENSE_MAP
from services.regulation_registry import RegulationRegistry
from unittest.mock import patch, MagicMock
# Build mock DB with migration data
from scripts.f1_migrate_regulation_registry import build_rows
rows = build_rows()
mock_rows = [
(r["regulation_id"], r["regulation_name_de"], r["license_rule"],
r["license_type"], r.get("attribution"), r["source_type"],
r["jurisdiction"], r["status"])
for r in rows
]
reg = RegulationRegistry()
with patch("services.regulation_registry.SessionLocal") as mock_cls:
mock_session = MagicMock()
mock_result = MagicMock()
mock_result.fetchall.return_value = mock_rows
mock_session.execute.return_value = mock_result
mock_cls.return_value = mock_session
reg._load()
# Compare every entry
mismatches = []
for reg_id, info in REGULATION_LICENSE_MAP.items():
db_result = reg.classify_regulation(reg_id)
if db_result["rule"] != info["rule"]:
mismatches.append(f"{reg_id}: DB rule={db_result['rule']} vs dict rule={info['rule']}")
assert not mismatches, f"Rule mismatches:\n" + "\n".join(mismatches)
class TestActionOntologyConsistency:
"""Ensure all old ACTION_TYPES entries are in the DB."""
def test_all_action_types_migrated(self):
from services.control_ontology import ACTION_TYPES
from scripts.f2_migrate_actions import build_action_types
db_names = {t["canonical_name"] for t in build_action_types()}
for action in ACTION_TYPES:
assert action in db_names, f"Missing action_type: {action}"
def test_all_aliases_migrated(self):
from services.control_ontology import ACTION_TYPES
from scripts.f2_migrate_actions import build_action_synonyms
db_synonyms = {s["synonym"] for s in build_action_synonyms() if s["pattern_type"] == "alias"}
missing = []
for action, info in ACTION_TYPES.items():
for alias in info.get("aliases", []):
if alias.lower() not in db_synonyms:
missing.append(f"{action}: {alias}")
assert not missing, f"Missing aliases:\n" + "\n".join(missing)
def test_all_negative_patterns_migrated(self):
from services.control_ontology import _NEGATIVE_PATTERNS
from scripts.f2_migrate_actions import build_action_synonyms
db_patterns = {s["synonym"] for s in build_action_synonyms() if s["pattern_type"] == "negative_pattern"}
for pattern, _ in _NEGATIVE_PATTERNS:
assert pattern.lower() in db_patterns, f"Missing negative pattern: {pattern}"
class TestObjectSynonymsConsistency:
"""Ensure all old _OBJECT_SYNONYMS are in the DB."""
def test_all_objects_migrated(self):
from services.control_dedup import _OBJECT_SYNONYMS
from scripts.f3_migrate_objects import build_rows
db_synonyms = {r["synonym"] for r in build_rows()}
missing = []
for syn in _OBJECT_SYNONYMS:
if syn.lower() not in db_synonyms:
missing.append(syn)
assert not missing, f"Missing object synonyms:\n" + "\n".join(missing)
class TestLLMEnrichmentQuality:
"""Basic quality checks on LLM-generated synonyms."""
def test_no_empty_synonyms_in_db(self):
"""All synonyms should have content."""
from scripts.f2_migrate_actions import build_action_synonyms
for s in build_action_synonyms():
assert len(s["synonym"].strip()) >= 2, f"Too short: {s}"
def test_no_duplicate_canonical_in_actions(self):
"""Each synonym should map to exactly one canonical action."""
from scripts.f2_migrate_actions import build_action_synonyms
synonyms = build_action_synonyms()
seen = {}
for s in synonyms:
key = (s["synonym"], s["language"], s["pattern_type"])
if key in seen:
assert seen[key] == s["canonical_action"], (
f"Duplicate synonym '{s['synonym']}' maps to both "
f"'{seen[key]}' and '{s['canonical_action']}'"
)
seen[key] = s["canonical_action"]
+1 -2
View File
@@ -490,9 +490,8 @@ services:
volumes:
- gitea_data:/var/lib/gitea
- gitea_config:/etc/gitea
- /etc/timezone:/etc/timezone:ro
- /etc/localtime:/etc/localtime:ro
environment:
TZ: "Europe/Berlin"
USER_UID: "1000"
USER_GID: "1000"
GITEA__database__DB_TYPE: postgres
+15 -3
View File
@@ -34,7 +34,11 @@ mkdir -p /vault/certs
# Step 1: Enable PKI Secrets Engine (Root CA)
# ================================================
echo "Enabling Root CA PKI engine..."
vault secrets enable -path=pki pki 2>/dev/null || echo "PKI engine already enabled"
if ! vault secrets list -format=json 2>/dev/null | grep -q '"pki/"'; then
vault secrets enable -path=pki pki
else
echo "PKI engine already enabled — skipping"
fi
# Set max lease TTL to 10 years for root CA
vault secrets tune -max-lease-ttl=87600h pki
@@ -59,7 +63,11 @@ vault write pki/config/urls \
# Step 2: Enable PKI Secrets Engine (Intermediate CA)
# ================================================
echo "Enabling Intermediate CA PKI engine..."
vault secrets enable -path=pki_int pki 2>/dev/null || echo "Intermediate PKI engine already enabled"
if ! vault secrets list -format=json 2>/dev/null | grep -q '"pki_int/"'; then
vault secrets enable -path=pki_int pki
else
echo "Intermediate PKI engine already enabled — skipping"
fi
# Set max lease TTL to 5 years for intermediate
vault secrets tune -max-lease-ttl=43800h pki_int
@@ -142,7 +150,11 @@ EOF
# ================================================
echo "Creating AppRole for certificate management..."
vault auth enable approle 2>/dev/null || echo "AppRole already enabled"
if ! vault auth list -format=json 2>/dev/null | grep -q '"approle/"'; then
vault auth enable approle
else
echo "AppRole already enabled — skipping"
fi
# Create role for nginx certificate management
vault write auth/approle/role/breakpilot-nginx \
+6 -2
View File
@@ -24,8 +24,12 @@ done
echo "Vault is ready. Initializing secrets..."
# Enable KV v2 secrets engine at 'secret/' (usually enabled in dev mode)
vault secrets enable -version=2 -path=secret kv 2>/dev/null || echo "KV engine already enabled"
# Enable KV v2 secrets engine at 'secret/' (only if not already enabled)
if ! vault secrets list -format=json 2>/dev/null | grep -q '"secret/"'; then
vault secrets enable -version=2 -path=secret kv
else
echo "KV engine already enabled — skipping"
fi
# ================================================
# API Keys (PLACEHOLDER - Replace in production!)
+11
View File
@@ -4,6 +4,7 @@ set -e
export VAULT_ADDR="http://vault:8200"
KEYS_FILE="/vault/data/init-keys.json"
INIT_MARKER="/vault/data/.init-complete"
echo "=== Vault Init/Unseal ==="
echo "Waiting for Vault to be ready..."
@@ -39,6 +40,12 @@ chmod 600 /vault/data/root-token
echo "=== Vault ready (persistent file storage) ==="
# Skip PKI + secrets init if already completed (prevents repeated mount-enable errors)
if [ -f "$INIT_MARKER" ]; then
echo "PKI + secrets already initialized (marker: $INIT_MARKER). Skipping."
exit 0
fi
# Run PKI init
if [ -f /vault/scripts/init-pki.sh ]; then
echo "Running PKI initialization..."
@@ -50,3 +57,7 @@ if [ -f /vault/scripts/init-secrets.sh ]; then
echo "Running secrets initialization..."
sh /vault/scripts/init-secrets.sh
fi
# Mark initialization as complete
touch "$INIT_MARKER"
echo "Init marker written: $INIT_MARKER"