From 8510af46eb772c2b878cca3d7e5d84b0028cc5da Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Sun, 10 May 2026 15:08:15 +0200 Subject: [PATCH] =?UTF-8?q?feat(pipeline):=20MC=20Quality=20Overhaul=20?= =?UTF-8?q?=E2=80=94=2074.5%=20=E2=86=92=2092.8%=20accuracy,=205.3K=20?= =?UTF-8?q?=E2=86=92=2013.6K=20MCs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 0: Quality Audit script (Claude Sonnet, 1750 samples) Phase 1: Object ontology expanded 31 → 74 tokens with descriptions + boundaries Phase 2: 174K controls re-classified via Haiku (10 batches, $50) - Generic tokens removed (documentation, procedure, process) - L2 sub-topics added (108K + 64K controls) - Bad subtopics fixed (stakeholder_*, escalation fragments) Phase 3: Re-clustering K=18704 (37K objects → 16.7K groups) Phase 4: Direct MC generation from canonical tokens (gpre2_direct_mc.py) Phase 5: Regulation-source split (gpre3, dry-run tested) New features: - Tenant-isolated document upload API (rag-service) - BAuA crawler (Playwright, 131 PDFs downloaded) - OSHA Technical Manual crawler (23 chapters) - CE obligation extractor (6141 obligations from Qdrant) RAG ingestion: - 126 BAuA PDFs (TRBS/TRGS/ASR): 27,664 chunks - OSHA Technical Manual: 7,241 chunks - OSHA 1910 Subpart O (full): 745 chunks - EuGH C-588/21 P: 216 chunks - EU 2018/1725: 842 chunks Co-Authored-By: Claude Opus 4.6 (1M context) --- ...STRUCTION-controls-fuer-andere-sessions.md | 158 ++++++++ .../010_expanded_object_ontology.sql | 162 ++++++++ .../scripts/extract_ce_obligations.py | 214 +++++++++++ .../scripts/gpre0_add_subtopics.py | 289 ++++++++++++++ .../scripts/gpre0_apply_corrections.py | 52 +++ .../scripts/gpre0_fix_bad_subtopics.py | 153 ++++++++ .../scripts/gpre0_fix_generic_tokens.py | 284 ++++++++++++++ control-pipeline/scripts/gpre0_run_all.sh | 37 ++ .../scripts/gpre0_validate_hints.py | 351 ++++++++++++++++++ control-pipeline/scripts/gpre2_direct_mc.py | 214 +++++++++++ .../scripts/gpre3_regulation_split.py | 298 +++++++++++++++ .../scripts/gpre_quality_audit.py | 310 ++++++++++++++++ .../services/decomposition_pass.py | 125 ++++++- control-pipeline/services/embedding_utils.py | 84 +++++ legal-sources/osha/crawl_osha_otm.py | 79 ++++ rag-service/api/__init__.py | 2 + rag-service/api/tenant_documents.py | 289 ++++++++++++++ rag-service/minio_client_wrapper.py | 10 + rag-service/qdrant_client_wrapper.py | 68 ++++ 19 files changed, 3173 insertions(+), 6 deletions(-) create mode 100644 control-pipeline/INSTRUCTION-controls-fuer-andere-sessions.md create mode 100644 control-pipeline/migrations/010_expanded_object_ontology.sql create mode 100644 control-pipeline/scripts/extract_ce_obligations.py create mode 100644 control-pipeline/scripts/gpre0_add_subtopics.py create mode 100644 control-pipeline/scripts/gpre0_apply_corrections.py create mode 100644 control-pipeline/scripts/gpre0_fix_bad_subtopics.py create mode 100644 control-pipeline/scripts/gpre0_fix_generic_tokens.py create mode 100644 control-pipeline/scripts/gpre0_run_all.sh create mode 100644 control-pipeline/scripts/gpre0_validate_hints.py create mode 100644 control-pipeline/scripts/gpre2_direct_mc.py create mode 100644 control-pipeline/scripts/gpre3_regulation_split.py create mode 100644 control-pipeline/scripts/gpre_quality_audit.py create mode 100644 control-pipeline/services/embedding_utils.py create mode 100644 legal-sources/osha/crawl_osha_otm.py create mode 100644 rag-service/api/tenant_documents.py diff --git a/control-pipeline/INSTRUCTION-controls-fuer-andere-sessions.md b/control-pipeline/INSTRUCTION-controls-fuer-andere-sessions.md new file mode 100644 index 0000000..097bef1 --- /dev/null +++ b/control-pipeline/INSTRUCTION-controls-fuer-andere-sessions.md @@ -0,0 +1,158 @@ +# Controls nutzen — Anleitung für andere Sessions + +**Stand:** 2026-05-07, wird laufend aktualisiert +**Repo:** breakpilot-core (~/Projekte/breakpilot-core) + +--- + +## Was sind die Controls? + +174.497 atomare Compliance-Controls in der Datenbank. Jeder Control ist eine **einzelne prüfbare Anforderung** aus einer Rechtsquelle (DSGVO, NIS2, NIST, AI Act, etc.). + +### Beispiel + +``` +Control-ID: AUTH-2956-A14 +Titel: "Implementierung von Multi-Faktor-Authentifizierung prüfen" +Objective: "Sicherstellen, dass MFA korrekt implementiert ist..." +Merge-Key: "verify:multi_factor_auth:testing" +Severity: high +``` + +## Wo liegen die Controls? + +### Datenbank (PostgreSQL auf Mac Mini) + +```sql +-- Alle Controls abfragen +SELECT id, control_id, title, objective, severity, + source_citation, -- Rechtsquelle (JSON) + generation_metadata->>'merge_group_hint' AS merge_key +FROM compliance.canonical_controls +WHERE release_state NOT IN ('deprecated', 'rejected'); +``` + +**Verbindung:** +```bash +# Vom MacBook: +ssh macmini "/usr/local/bin/docker exec bp-core-postgres psql -U breakpilot -d breakpilot_db" + +# Oder via Control-Pipeline Container: +ssh macmini "/usr/local/bin/docker exec bp-core-control-pipeline curl -sf http://127.0.0.1:8098/..." +``` + +### API (Port 8098, nur via Docker exec erreichbar) + +```bash +# Master Controls auflisten +ssh macmini "/usr/local/bin/docker exec bp-core-control-pipeline \ + curl -sf 'http://127.0.0.1:8098/v1/master-controls?limit=50&sort=total_controls'" + +# Master Control Detail mit allen Membern +ssh macmini "/usr/local/bin/docker exec bp-core-control-pipeline \ + curl -sf 'http://127.0.0.1:8098/v1/master-controls/MC-8292'" +``` + +## Struktur der Controls + +### merge_group_hint (Schlüsselfeld!) + +Jeder Control hat einen `merge_group_hint` im Format `action:object:phase`: + +``` +implement:encryption:implementation +define:access_control:definition +monitor:network_security:monitoring +report:supervisory_authority:reporting +``` + +**74 kanonische Object-Tokens** (Stand 2026-05-07): + +| Kategorie | Tokens | +|-----------|--------| +| **Security** | multi_factor_auth, password_policy, credentials, session_management, privileged_access, access_control, encryption, transport_encryption, key_management, certificate_management, network_security, network_segmentation, firewall, vpn, remote_access, monitoring, audit_logging, siem, alerting, compliance_audit, vulnerability, patch_management, backup, disaster_recovery, physical_security, secure_development, api_security, input_validation, container_security, logging_configuration | +| **Data Protection** | personal_data, sensitive_data, health_data, consent, data_subject_rights, data_retention, data_transfer, data_breach_notification, dpia, data_processing_agreement, privacy_by_design, data_processing_register, data_classification, cookie_consent, video_surveillance | +| **Governance** | policy, procedure, process, training, awareness, incident, risk_management, third_party_management, change_management, documentation, records_management, compliance_reporting, asset_management, human_resources_security | +| **Regulatory** | supervisory_authority, certification, product_safety, ai_system, financial_reporting, aml, whistleblowing, consumer_protection, ecommerce, telecommunications, medical_device, payment_services, critical_infrastructure, supply_chain_due_diligence, sustainability_reporting | + +### Rechtsquellen (source_citation) + +Die **Parent-Controls** (nicht die atomaren!) haben `source_citation`: + +```sql +-- Controls mit Rechtsquelle finden +SELECT cc.control_id, cc.title, + pc.source_citation->>'source' AS regulation, + pc.source_citation->>'article' AS article +FROM compliance.canonical_controls cc +JOIN compliance.canonical_controls pc ON pc.id = cc.parent_control_uuid +WHERE pc.source_citation IS NOT NULL + AND pc.source_citation->>'source' LIKE '%DSGVO%'; +``` + +148 verschiedene Rechtsquellen (DSGVO, NIS2, NIST, OWASP, BSI, TKG, etc.) + +## Controls filtern (Use Cases) + +### Beispiel: Alle DSGVO Art. 13 Controls (für DSI-Prüfung) + +```sql +SELECT cc.control_id, cc.title, cc.objective, + cc.generation_metadata->>'merge_group_hint' AS merge_key, + pc.source_citation->>'article' AS article +FROM compliance.canonical_controls cc +JOIN compliance.canonical_controls pc ON pc.id = cc.parent_control_uuid +WHERE pc.source_citation->>'source' = 'DSGVO (EU) 2016/679' + AND pc.source_citation->>'article' LIKE '%13%' + AND cc.release_state NOT IN ('deprecated', 'rejected') +ORDER BY cc.control_id; +``` + +### Beispiel: Alle Encryption-Controls + +```sql +SELECT control_id, title, objective +FROM compliance.canonical_controls +WHERE generation_metadata->>'merge_group_hint' LIKE '%:encryption:%' + AND release_state NOT IN ('deprecated', 'rejected'); +``` + +### Beispiel: Controls nach Object-Token filtern + +```sql +-- Alle Controls zu einem bestimmten Thema +SELECT control_id, title, + generation_metadata->>'merge_group_hint' AS merge_key +FROM compliance.canonical_controls +WHERE generation_metadata->>'merge_group_hint' LIKE '%:data_retention:%' + AND release_state NOT IN ('deprecated', 'rejected'); +``` + +## Wichtige Tabellen + +| Tabelle | Rows | Beschreibung | +|---------|------|-------------| +| `compliance.canonical_controls` | ~294K | Alle Controls (Rich + Atomic) | +| `compliance.master_controls` | ~5.329 | Gruppierte Master Controls | +| `compliance.master_control_members` | ~172K | Zuordnung Control → MC | +| `compliance.object_ontology` | 74 | Kanonische Object-Definitionen | +| `compliance.regulation_registry` | 223 | Rechtsquellen-Register | + +## Was gerade passiert (2026-05-07) + +**Phase 2 läuft:** Alle 174K Controls werden per Claude Haiku re-klassifiziert. Die `merge_group_hint` werden von frei-form LLM-Objekten auf 74 kanonische Tokens normalisiert. Danach: +- Phase 3: Re-Clustering (gpre1 mit K=20000) +- Phase 4: Neue Master Controls (gpre2) +- Phase 5: Regulation-Source-Split (gpre3) + +**NICHT ÄNDERN:** `canonical_controls`, `master_controls`, `object_ontology` Tabellen werden aktiv bearbeitet. + +## DB-Zugang Quick Reference + +```bash +# Quick Query (eine Zeile) +ssh macmini "/usr/local/bin/docker exec bp-core-postgres psql -U breakpilot -d breakpilot_db -c \"SELECT count(*) FROM compliance.canonical_controls\"" + +# Interaktive Session +ssh macmini "/usr/local/bin/docker exec -it bp-core-postgres psql -U breakpilot -d breakpilot_db" +``` diff --git a/control-pipeline/migrations/010_expanded_object_ontology.sql b/control-pipeline/migrations/010_expanded_object_ontology.sql new file mode 100644 index 0000000..9ecf9e5 --- /dev/null +++ b/control-pipeline/migrations/010_expanded_object_ontology.sql @@ -0,0 +1,162 @@ +-- Migration 010: Expanded Object Ontology +-- Expands from 31 to ~180 canonical object tokens with clear semantic boundaries. +-- Each token has a description to prevent ambiguous classification. +-- +-- IMPORTANT: This migration ADDS new tokens. Existing synonyms are preserved. + +SET search_path TO compliance, public; + +-- Add description column to object_synonyms if not exists +DO $$ BEGIN + ALTER TABLE object_synonyms ADD COLUMN IF NOT EXISTS description TEXT; +EXCEPTION WHEN duplicate_column THEN NULL; +END $$; + +-- New table: canonical object definitions with clear boundaries +CREATE TABLE IF NOT EXISTS object_ontology ( + canonical_token VARCHAR(100) PRIMARY KEY, + category VARCHAR(50) NOT NULL, -- security, data_protection, governance, regulatory, technical + description_de TEXT NOT NULL, -- German description for LLM prompts + description_en TEXT NOT NULL, -- English description + NOT_confused_with TEXT, -- Explicit disambiguation + examples TEXT, -- Example controls that belong here + created_at TIMESTAMPTZ DEFAULT NOW() +); + +-- ═══════════════════════════════════════════════════════════════ +-- SECURITY & TECHNICAL +-- ═══════════════════════════════════════════════════════════════ + +-- Authentication & Identity +INSERT INTO object_ontology VALUES +('multi_factor_auth', 'security', 'Multi-Faktor-Authentifizierung (2FA/MFA)', 'Multi-factor authentication', 'NOT password_policy (Passwortregeln) oder session_management (Sitzungen)', 'MFA implementieren, 2FA-Pflicht, Authentifizierungsfaktoren'), +('password_policy', 'security', 'Passwortrichtlinien und -komplexität', 'Password policies and complexity', 'NOT credentials (allg. Zugangsdaten) oder multi_factor_auth (MFA)', 'Passwortlänge, Komplexität, Rotation, Passwort-Historie'), +('credentials', 'security', 'Zugangsdaten-Verwaltung (Tokens, API-Keys, Secrets)', 'Credential management', 'NOT password_policy (Passwortregeln) oder key_management (kryptografisch)', 'API-Key-Rotation, Token-Verwaltung, Secret Storage'), +('session_management', 'security', 'Sitzungsverwaltung (Session Timeout, Token-Lifecycle)', 'Session management', 'NOT multi_factor_auth (Login) oder access_control (Berechtigungen)', 'Session Timeout, Token-Invalidierung, Concurrent Sessions'), +('privileged_access', 'security', 'Verwaltung privilegierter Zugriffe (Admin, Root)', 'Privileged access management', 'NOT access_control (allg. Zugriffskontrolle)', 'Admin-Konten, Root-Zugriff, PAM, Just-in-Time-Access'), +('access_control', 'security', 'Allgemeine Zugriffskontrolle (RBAC, Berechtigungen)', 'Access control (RBAC, permissions)', 'NOT privileged_access (Admin) oder authentication (Login)', 'Rollenbasierte Zugriffskontrolle, Berechtigungsvergabe, Least Privilege') +ON CONFLICT (canonical_token) DO UPDATE SET description_de = EXCLUDED.description_de, description_en = EXCLUDED.description_en, NOT_confused_with = EXCLUDED.NOT_confused_with; + +-- Encryption & Cryptography +INSERT INTO object_ontology VALUES +('encryption', 'security', 'Verschlüsselung at-rest (Datenverschlüsselung)', 'Encryption at rest', 'NOT transport_encryption (in-transit) oder key_management (Schlüssel)', 'AES-256, Festplattenverschlüsselung, DB-Verschlüsselung'), +('transport_encryption', 'security', 'Transportverschlüsselung (TLS, HTTPS)', 'Transport encryption (TLS)', 'NOT encryption (at-rest)', 'TLS 1.3, HTTPS, mTLS, Zertifikats-Pinning'), +('key_management', 'security', 'Kryptografische Schlüsselverwaltung', 'Cryptographic key management', 'NOT credentials (API-Keys) oder certificate_management (Zertifikate)', 'Key Rotation, HSM, Key Escrow, Schlüsselerzeugung'), +('certificate_management', 'security', 'Zertifikatsverwaltung (PKI, X.509)', 'Certificate management (PKI)', 'NOT key_management (Schlüssel) oder encryption (Verschlüsselung)', 'X.509-Zertifikate, PKI, Zertifikatsrückruf, CA-Verwaltung') +ON CONFLICT (canonical_token) DO UPDATE SET description_de = EXCLUDED.description_de, description_en = EXCLUDED.description_en, NOT_confused_with = EXCLUDED.NOT_confused_with; + +-- Network Security +INSERT INTO object_ontology VALUES +('network_security', 'security', 'Allgemeine Netzwerksicherheit', 'General network security', 'NOT network_segmentation (Segmentierung) oder firewall (Regeln)', 'Netzwerk-Hardening, Port-Management, DNS-Sicherheit'), +('network_segmentation', 'security', 'Netzwerksegmentierung (VLANs, Zonen)', 'Network segmentation', 'NOT network_security (allg.) oder firewall (Regeln)', 'VLANs, DMZ, Micro-Segmentation, Zero Trust Network'), +('firewall', 'security', 'Firewall-Regeln und -Verwaltung', 'Firewall rules and management', 'NOT network_security (allg.)', 'WAF, Firewall-Regeln, Ingress/Egress, Whitelist'), +('vpn', 'security', 'VPN-Konfiguration und -Verwaltung', 'VPN configuration', NULL, 'IPSec, WireGuard, Site-to-Site VPN'), +('remote_access', 'security', 'Fernzugriff und Remote-Arbeit', 'Remote access', 'NOT vpn (Technologie)', 'Remote Desktop, Bastion Hosts, Jump Server') +ON CONFLICT (canonical_token) DO UPDATE SET description_de = EXCLUDED.description_de, description_en = EXCLUDED.description_en, NOT_confused_with = EXCLUDED.NOT_confused_with; + +-- Monitoring & Logging (CRITICAL: clear boundaries!) +INSERT INTO object_ontology VALUES +('monitoring', 'security', 'Kontinuierliche Echtzeit-Überwachung von Systemen/Metriken', 'Continuous real-time monitoring of systems', 'NOT audit_logging (Protokollierung), NOT training (Schulung), NOT procedure (Verfahren), NOT risk_assessment (Bewertung)', 'System-Health-Monitoring, Verfügbarkeitsüberwachung, Performance-Monitoring, Anomalie-Erkennung in Echtzeit'), +('audit_logging', 'security', 'Protokollierung und Audit-Trail (Nachvollziehbarkeit)', 'Audit logging and trail', 'NOT monitoring (Echtzeit-Überwachung), NOT compliance_audit (Prüfungen)', 'Log-Aufzeichnung, Audit Trail, Zeitstempel, Nachvollziehbarkeit, Protokollierung von Zugriffen'), +('siem', 'security', 'Security Information and Event Management', 'SIEM', 'NOT monitoring (allg.) oder audit_logging (Protokollierung)', 'SIEM-Korrelation, Security Events, Log-Aggregation'), +('alerting', 'security', 'Benachrichtigungen und Meldepflichten bei Sicherheitsereignissen', 'Security alerting and notification obligations', 'NOT monitoring (Überwachung) oder incident (Vorfallsbehandlung)', 'Sicherheitsmeldungen, Breach Notification, Benachrichtigungspflichten'), +('compliance_audit', 'governance', 'Compliance-Prüfungen und externe Audits', 'Compliance audits and external reviews', 'NOT audit_logging (technische Protokollierung), NOT monitoring (Überwachung)', 'Externe Prüfung, Jahresabschlussprüfung, Zertifizierungsaudit, Lieferanten-Audit') +ON CONFLICT (canonical_token) DO UPDATE SET description_de = EXCLUDED.description_de, description_en = EXCLUDED.description_en, NOT_confused_with = EXCLUDED.NOT_confused_with; + +-- Vulnerability & Patch Management +INSERT INTO object_ontology VALUES +('vulnerability', 'security', 'Schwachstellenmanagement und -scanning', 'Vulnerability management', 'NOT patch_management (Updates)', 'Vulnerability Scanning, CVE-Tracking, Penetration Testing'), +('patch_management', 'security', 'Software-Updates und Patch-Verwaltung', 'Patch management', 'NOT vulnerability (Scanning)', 'Patch-Zyklus, Update-Policy, Hotfix-Prozess') +ON CONFLICT (canonical_token) DO UPDATE SET description_de = EXCLUDED.description_de, description_en = EXCLUDED.description_en, NOT_confused_with = EXCLUDED.NOT_confused_with; + +-- Backup & Recovery +INSERT INTO object_ontology VALUES +('backup', 'security', 'Datensicherung und Backup-Strategien', 'Backup strategies', 'NOT disaster_recovery (Wiederherstellung)', 'Backup-Rotation, Offsite-Backup, Backup-Verschlüsselung'), +('disaster_recovery', 'security', 'Notfallwiederherstellung und Business Continuity', 'Disaster recovery', 'NOT backup (Datensicherung) oder incident (Vorfälle)', 'DR-Plan, RTO/RPO, Failover, Business Continuity') +ON CONFLICT (canonical_token) DO UPDATE SET description_de = EXCLUDED.description_de, description_en = EXCLUDED.description_en, NOT_confused_with = EXCLUDED.NOT_confused_with; + +-- ═══════════════════════════════════════════════════════════════ +-- DATA PROTECTION (CRITICAL: clear boundaries!) +-- ═══════════════════════════════════════════════════════════════ + +INSERT INTO object_ontology VALUES +('personal_data', 'data_protection', 'Verarbeitung personenbezogener Daten (DSGVO-Grundsätze)', 'Personal data processing principles', 'NOT sensitive_data (besondere Kategorien), NOT data_subject_rights (Betroffenenrechte), NOT consent (Einwilligung)', 'Datenminimierung, Zweckbindung, Speicherbegrenzung, Rechtmäßigkeit der Verarbeitung'), +('sensitive_data', 'data_protection', 'Besondere Kategorien personenbezogener Daten (Art. 9 DSGVO)', 'Special categories of personal data', 'NOT personal_data (allg.), NOT health_data (Gesundheit)', 'Biometrische Daten, ethnische Herkunft, politische Meinungen, Gewerkschaftszugehörigkeit'), +('health_data', 'data_protection', 'Gesundheitsdaten und Medizindaten', 'Health and medical data', 'NOT sensitive_data (allg. besondere Kategorien)', 'Patientendaten, Medizinprodukte-Daten, klinische Daten'), +('consent', 'data_protection', 'Einwilligungsmanagement', 'Consent management', 'NOT data_subject_rights (andere Betroffenenrechte)', 'Einwilligung einholen, Widerruf, Opt-In, Consent-Banner'), +('data_subject_rights', 'data_protection', 'Betroffenenrechte (Auskunft, Löschung, Portabilität)', 'Data subject rights (access, erasure, portability)', 'NOT consent (Einwilligung), NOT personal_data (Verarbeitung)', 'Auskunftsrecht, Recht auf Löschung, Datenportabilität, Widerspruchsrecht'), +('data_retention', 'data_protection', 'Aufbewahrungsfristen und Löschkonzept', 'Data retention and deletion', 'NOT backup (technische Sicherung)', 'Löschfristen, Aufbewahrungspflichten, Löschkonzept, Archivierung'), +('data_transfer', 'data_protection', 'Internationale Datenübermittlung (Drittländer, SCC)', 'International data transfer', 'NOT data_processing (Verarbeitung)', 'Drittlandtransfer, Standardvertragsklauseln, Angemessenheitsbeschluss, BCR'), +('data_breach_notification', 'data_protection', 'Meldung von Datenschutzverletzungen (Art. 33/34 DSGVO)', 'Data breach notification', 'NOT incident (allg. Sicherheitsvorfälle), NOT alerting (techn. Alerts)', 'Breach-Meldung an Aufsichtsbehörde, Benachrichtigung Betroffener, 72-Stunden-Frist'), +('dpia', 'data_protection', 'Datenschutz-Folgenabschätzung (Art. 35 DSGVO)', 'Data protection impact assessment', NULL, 'DSFA, Schwellwertanalyse, Risikobewertung für Betroffene'), +('data_processing_agreement', 'data_protection', 'Auftragsverarbeitung (Art. 28 DSGVO)', 'Data processing agreements', NULL, 'AVV, Auftragsverarbeiter, Sub-Auftragsverarbeiter, TOMs'), +('privacy_by_design', 'data_protection', 'Datenschutz durch Technikgestaltung (Art. 25 DSGVO)', 'Privacy by design and default', NULL, 'Privacy by Default, Datenminimierung in der Architektur'), +('data_processing_register', 'data_protection', 'Verzeichnis von Verarbeitungstätigkeiten (Art. 30 DSGVO)', 'Records of processing activities', NULL, 'VVT, Verarbeitungsverzeichnis') +ON CONFLICT (canonical_token) DO UPDATE SET description_de = EXCLUDED.description_de, description_en = EXCLUDED.description_en, NOT_confused_with = EXCLUDED.NOT_confused_with; + +-- ═══════════════════════════════════════════════════════════════ +-- GOVERNANCE & ORGANIZATION +-- ═══════════════════════════════════════════════════════════════ + +INSERT INTO object_ontology VALUES +('policy', 'governance', 'Richtlinien und Leitlinien ERSTELLEN/DEFINIEREN', 'Creating/defining policies', 'NOT procedure (Verfahrensablauf), NOT compliance_audit (Prüfung)', 'Sicherheitsrichtlinie erstellen, Policy-Framework definieren, Leitlinie verabschieden'), +('procedure', 'governance', 'Verfahren und Prozessabläufe DEFINIEREN/DOKUMENTIEREN', 'Defining/documenting procedures', 'NOT incident (Vorfallsbehandlung), NOT process (laufender Betrieb)', 'Verfahrensanweisung, Ablaufbeschreibung, Standardprozess definieren'), +('process', 'governance', 'Laufende betriebliche Prozesse AUSFÜHREN', 'Executing operational processes', 'NOT procedure (Definition), NOT monitoring (Überwachung)', 'Betriebsprozess, Geschäftsprozess, Workflow-Ausführung'), +('training', 'governance', 'Schulung und Weiterbildung DURCHFÜHREN', 'Training and education', 'NOT awareness (Sensibilisierung), NOT monitoring (Überwachung!)', 'Mitarbeiterschulung, Zertifizierungskurs, Pflichtunterweisung'), +('awareness', 'governance', 'Sicherheitsbewusstsein und Sensibilisierung', 'Security awareness', 'NOT training (formale Schulung)', 'Phishing-Simulation, Awareness-Kampagne, Sicherheitskultur'), +('incident', 'governance', 'Sicherheitsvorfälle BEHANDELN (Incident Response)', 'Incident response and handling', 'NOT alerting (Benachrichtigung), NOT data_breach_notification (DSGVO-Meldung)', 'Incident Response Plan, Vorfallsanalyse, Containment, Recovery, Lessons Learned'), +('risk_management', 'governance', 'Risikomanagement und -bewertung', 'Risk management and assessment', 'NOT vulnerability (techn. Schwachstellen), NOT monitoring (Überwachung)', 'Risikobewertung, Risikobehandlung, Risikoakzeptanz, Risikomatrix'), +('third_party_management', 'governance', 'Lieferanten- und Drittanbieter-Management', 'Third-party and vendor management', 'NOT data_processing_agreement (AVV)', 'Lieferantenbewertung, Vendor Risk Assessment, Supply Chain Security'), +('change_management', 'governance', 'Änderungsmanagement', 'Change management', 'NOT patch_management (Updates)', 'Change Request, Change Advisory Board, Rollback-Verfahren'), +('documentation', 'governance', 'Allgemeine Dokumentationspflichten', 'General documentation requirements', 'NOT audit_logging (technische Logs), NOT data_processing_register (VVT)', 'Betriebshandbuch, Systemdokumentation, Verfahrensdokumentation'), +('records_management', 'governance', 'Akten- und Unterlagenverwaltung', 'Records management', 'NOT data_retention (Löschfristen)', 'Archivierung, Aktenführung, Aufbewahrungspflichten nach HGB/AO'), +('compliance_reporting', 'governance', 'Compliance-Berichterstattung', 'Compliance reporting', 'NOT alerting (techn. Alerts), NOT supervisory_authority (Behördenkommunikation)', 'Compliance-Bericht, Management-Reporting, KPI-Tracking'), +('asset_management', 'governance', 'IT-Asset-Verwaltung und Inventar', 'IT asset management', NULL, 'Asset-Inventar, CMDB, Hardware-Lifecycle, Software-Inventar'), +('physical_security', 'security', 'Physische Sicherheit und Zutrittskontrolle', 'Physical security and access', NULL, 'Zutrittskontrolle, Videoüberwachung (physisch), Serverraum-Sicherheit'), +('human_resources_security', 'governance', 'Personalsicherheit', 'HR security', 'NOT training (Schulung)', 'Background-Checks, Geheimhaltungsvereinbarungen, Onboarding/Offboarding') +ON CONFLICT (canonical_token) DO UPDATE SET description_de = EXCLUDED.description_de, description_en = EXCLUDED.description_en, NOT_confused_with = EXCLUDED.NOT_confused_with; + +-- ═══════════════════════════════════════════════════════════════ +-- REGULATORY SPECIFIC +-- ═══════════════════════════════════════════════════════════════ + +INSERT INTO object_ontology VALUES +('supervisory_authority', 'regulatory', 'Kommunikation mit Aufsichtsbehörden', 'Supervisory authority communication', 'NOT compliance_reporting (interne Berichte)', 'Meldung an BaFin, Abstimmung mit DPA, behördliche Anfragen'), +('certification', 'regulatory', 'Zertifizierung und Konformitätsbewertung', 'Certification and conformity assessment', 'NOT compliance_audit (Prüfung), NOT personal_data (Datenschutz)', 'CE-Kennzeichnung, ISO-Zertifizierung, Konformitätserklärung'), +('product_safety', 'regulatory', 'Produktsicherheit und Marktüberwachung', 'Product safety and market surveillance', 'NOT certification (Zertifizierung)', 'Rückrufmanagement, Sicherheitsbewertung, RAPEX-Meldung'), +('ai_system', 'regulatory', 'KI-System-Regulierung (AI Act)', 'AI system regulation', NULL, 'KI-Risikobewertung, Hochrisiko-KI, Transparenzpflichten, FRIA'), +('financial_reporting', 'regulatory', 'Finanzberichterstattung und Rechnungslegung', 'Financial reporting and accounting', NULL, 'Jahresabschluss, HGB-Pflichten, IFRS, Buchführung'), +('aml', 'regulatory', 'Geldwäscheprävention und KYC', 'Anti-money laundering and KYC', NULL, 'KYC, Verdachtsmeldung, PEP-Prüfung, Transaktionsmonitoring'), +('whistleblowing', 'regulatory', 'Hinweisgeberschutz und Meldekanäle', 'Whistleblower protection', NULL, 'Hinweisgebersystem, Meldekanal, Hinweisgeberschutzgesetz'), +('consumer_protection', 'regulatory', 'Verbraucherschutz und AGB', 'Consumer protection', NULL, 'AGB-Prüfung, Widerrufsrecht, Informationspflichten, Preistransparenz'), +('ecommerce', 'regulatory', 'E-Commerce-Pflichten (Impressum, Fernabsatz)', 'E-commerce obligations', NULL, 'Impressumspflicht, Fernabsatzrecht, Online-Handel-Pflichten'), +('telecommunications', 'regulatory', 'Telekommunikationsregulierung', 'Telecommunications regulation', NULL, 'TKG-Pflichten, Vorratsdatenspeicherung, Notruf'), +('medical_device', 'regulatory', 'Medizinprodukte-Regulierung (MDR)', 'Medical device regulation', NULL, 'UDI, klinische Bewertung, Post-Market Surveillance'), +('payment_services', 'regulatory', 'Zahlungsdienste-Regulierung (PSD2)', 'Payment services regulation', NULL, 'Starke Kundenauthentifizierung, PSD2-Compliance, Open Banking'), +('critical_infrastructure', 'regulatory', 'KRITIS und NIS2-Pflichten', 'Critical infrastructure (NIS2)', NULL, 'KRITIS-Meldepflichten, NIS2-Maßnahmen, Mindeststandards'), +('supply_chain_due_diligence', 'regulatory', 'Lieferkettensorgfaltspflicht (LkSG)', 'Supply chain due diligence', 'NOT third_party_management (allg. Lieferanten)', 'Menschenrechts-Due-Diligence, Umwelt-Sorgfaltspflicht, LkSG-Bericht'), +('sustainability_reporting', 'regulatory', 'Nachhaltigkeitsberichterstattung (CSRD)', 'Sustainability reporting', NULL, 'ESG-Reporting, CSRD, Nachhaltigkeitsbericht'), +('cookie_consent', 'regulatory', 'Cookie-Consent und Tracking (TDDDG/ePrivacy)', 'Cookie consent and tracking', 'NOT consent (allg. Einwilligung)', 'Cookie-Banner, Tracking-Einwilligung, TDDDG §25'), +('video_surveillance', 'regulatory', 'Videoüberwachung (datenschutzrechtlich)', 'Video surveillance (data protection)', 'NOT physical_security (physische Sicherheit), NOT monitoring (IT-Monitoring)', 'Kamera-Überwachung, Speicherfristen, Kennzeichnungspflicht') +ON CONFLICT (canonical_token) DO UPDATE SET description_de = EXCLUDED.description_de, description_en = EXCLUDED.description_en, NOT_confused_with = EXCLUDED.NOT_confused_with; + +-- ═══════════════════════════════════════════════════════════════ +-- APPLICATION SECURITY +-- ═══════════════════════════════════════════════════════════════ + +INSERT INTO object_ontology VALUES +('secure_development', 'technical', 'Sichere Softwareentwicklung (SDLC)', 'Secure software development lifecycle', NULL, 'Secure Coding, Code Review, SAST/DAST, DevSecOps'), +('api_security', 'technical', 'API-Sicherheit', 'API security', NULL, 'API-Authentifizierung, Rate Limiting, Input Validation'), +('input_validation', 'technical', 'Eingabevalidierung und Output Encoding', 'Input validation and output encoding', NULL, 'XSS-Prävention, SQL-Injection-Schutz, Parametervalidierung'), +('container_security', 'technical', 'Container- und Cloud-Sicherheit', 'Container and cloud security', NULL, 'Docker-Hardening, Kubernetes-Security, Image-Scanning'), +('logging_configuration', 'technical', 'Log-Konfiguration und -Format', 'Log configuration and format', 'NOT audit_logging (Nachvollziehbarkeit), NOT monitoring (Überwachung)', 'Log-Format, Log-Rotation, Log-Shipping, Structured Logging'), +('data_classification', 'governance', 'Datenklassifizierung und -kennzeichnung', 'Data classification and labeling', 'NOT sensitive_data (besondere Kategorien)', 'Vertraulichkeitsstufen, Datenklassifizierung, Labeling') +ON CONFLICT (canonical_token) DO UPDATE SET description_de = EXCLUDED.description_de, description_en = EXCLUDED.description_en, NOT_confused_with = EXCLUDED.NOT_confused_with; + +-- Count results +DO $$ +DECLARE cnt INTEGER; +BEGIN + SELECT count(*) INTO cnt FROM object_ontology; + RAISE NOTICE 'object_ontology: % canonical tokens defined', cnt; +END $$; diff --git a/control-pipeline/scripts/extract_ce_obligations.py b/control-pipeline/scripts/extract_ce_obligations.py new file mode 100644 index 0000000..0791857 --- /dev/null +++ b/control-pipeline/scripts/extract_ce_obligations.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python3 +""" +Extract CE-relevant obligations from TRBS/TRGS/ASR/OSHA chunks in Qdrant. + +Searches for MUSS/SOLL patterns in chunk texts and classifies them. +Output: JSON file with structured obligations for the CE session. + +Usage: + python3 /app/scripts/extract_ce_obligations.py + python3 /app/scripts/extract_ce_obligations.py --output /tmp/ce_obligations.json +""" + +import argparse +import json +import logging +import os +import re +from pathlib import Path + +import httpx + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" +) +logger = logging.getLogger("ce-obligations") + +QDRANT_URL = os.getenv("QDRANT_URL", "http://qdrant:6333") +COLLECTION = "bp_compliance_ce" +OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434") +LLM_MODEL = "qwen3.5:35b-a3b" + +# Obligation patterns (DE + EN) +OBLIGATION_PATTERNS = re.compile( + r"(muss|müssen|hat\s+[\w\s]*zu\s|ist\s+[\w\s]*sicherzustellen|" + r"ist\s+verpflichtet|sind\s+verpflichtet|darf\s+nicht|" + r"shall|must|required\s+to|is\s+required|shall\s+not)", + re.IGNORECASE, +) + +# CE relevance keywords +CE_KEYWORDS = re.compile( + r"(maschine|schutzeinrichtung|gefährdung|quetsch|scher|stoß|" + r"schneid|fang|einzug|absturz|druck|explosion|brand|" + r"elektrisch|spannung|erdung|schutzleiter|not-halt|" + r"betriebsanleitung|kennzeichnung|prüfung|prüfpflicht|" + r"instandhaltung|wartung|sicherheitsabstand|" + r"schutzmaßnahme|persönliche schutzausrüstung|psa|" + r"machine|guard|hazard|crush|shear|cut|entangle|" + r"lockout|tagout|electrical|grounding|emergency stop|" + r"safety distance|protective device|ppe|inspection)", + re.IGNORECASE, +) + +HAZARD_CATEGORIES = { + "quetsch|crush|squeeze": "mechanical_crushing", + "schneid|cut": "mechanical_cutting", + "fang|einzug|entangle|draw": "mechanical_entanglement", + "absturz|fall": "fall_hazard", + "explosion|ex-bereich|atex": "explosion_hazard", + "brand|fire|feuer": "fire_hazard", + "elektrisch|electrical|spannung|voltage": "electrical_hazard", + "lärm|noise|schall": "noise_hazard", + "gefahrstoff|hazardous substance|chemical": "chemical_hazard", + "ergonomie|ergonomic|heben|lift": "ergonomic_hazard", + "temperatur|heat|hitze|kälte|cold": "thermal_hazard", + "strahlung|radiation|laser": "radiation_hazard", + "not-halt|emergency stop|e-stop": "emergency_stop", + "lockout|tagout|loto": "lockout_tagout", + "kennzeichnung|label|marking|sign": "safety_marking", + "prüfung|inspection|test": "inspection_requirement", + "instandhaltung|maintenance|wartung": "maintenance", + "schutzeinrichtung|guard|protective device": "protective_device", + "betriebsanleitung|instruction|manual": "operating_instructions", + "druck|pressure|behälter|vessel|kessel|boiler": "pressure_hazard", +} + +# Source-based overrides: TRGS docs about chemicals/storage +# should never be classified as mechanical hazards +_CHEMICAL_SOURCES = re.compile( + r"trgs\s*(5[0-9]{2}|7[0-9]{2}|9[0-9]{2}|4[0-9]{2}|6[0-9]{2})", + re.IGNORECASE, +) + + +def _classify_hazard(text: str, source: str) -> str: + """Classify hazard with source-aware overrides.""" + # TRGS sources → chemical/pressure/explosion, never mechanical + if _CHEMICAL_SOURCES.search(source): + if re.search(r"explosion|ex-bereich|atex|zündfähig", text, re.IGNORECASE): + return "explosion_hazard" + if re.search(r"druck|pressure|behälter|vessel", text, re.IGNORECASE): + return "pressure_hazard" + if re.search(r"brand|fire|feuer", text, re.IGNORECASE): + return "fire_hazard" + return "chemical_hazard" + + # Standard pattern matching (order matters — specific first) + for pattern, category in HAZARD_CATEGORIES.items(): + if re.search(pattern, text, re.IGNORECASE): + return category + return "general" + + +def scroll_chunks(source_filter: str = None) -> list[dict]: + """Scroll through Qdrant to get all relevant chunks.""" + chunks = [] + offset = None + batch = 100 + + while True: + scroll_body = { + "limit": batch, + "with_payload": True, + "with_vector": False, + } + if offset is not None: + scroll_body["offset"] = offset + + resp = httpx.post( + f"{QDRANT_URL}/collections/{COLLECTION}/points/scroll", + json=scroll_body, + timeout=30.0, + ) + data = resp.json() + points = data.get("result", {}).get("points", []) + next_offset = data.get("result", {}).get("next_page_offset") + + for pt in points: + payload = pt.get("payload", {}) + source = payload.get("source", payload.get("filename", "")) + text = payload.get("chunk_text", "") + + # Filter for TRBS/TRGS/ASR/OSHA + source_lower = source.lower() + is_relevant = any(k in source_lower for k in + ["trbs", "trgs", "asr", "osha"]) + if not is_relevant: + continue + + # Check for obligation patterns + if not OBLIGATION_PATTERNS.search(text): + continue + + # Check CE relevance + if not CE_KEYWORDS.search(text): + continue + + # Classify hazard category (source-aware) + hazard = _classify_hazard(text, source) + + # Determine obligation type + if re.search(r"muss|müssen|shall|must|required", text, re.IGNORECASE): + obl_type = "MUSS" + elif re.search(r"soll|sollte|should", text, re.IGNORECASE): + obl_type = "SOLL" + else: + obl_type = "MUSS" + + chunks.append({ + "source": source, + "section": payload.get("section", ""), + "paragraph": payload.get("paragraph", ""), + "obligation_text": text.strip()[:500], + "hazard_category": hazard, + "obligation_type": obl_type, + "ce_relevance": "high" if hazard != "general" else "medium", + "filename": payload.get("filename", ""), + }) + + if next_offset is None or not points: + break + offset = next_offset + + if len(chunks) % 500 == 0: + logger.info(" Scanned... %d obligations found so far", len(chunks)) + + return chunks + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--output", default="/tmp/ce_obligations.json") + args = parser.parse_args() + + logger.info("Scanning %s for CE obligations...", COLLECTION) + obligations = scroll_chunks() + + logger.info("Found %d CE-relevant obligations", len(obligations)) + + # Stats + by_source = {} + by_hazard = {} + for o in obligations: + src = o["source"][:30] + by_source[src] = by_source.get(src, 0) + 1 + by_hazard[o["hazard_category"]] = by_hazard.get(o["hazard_category"], 0) + 1 + + logger.info("\nBy source:") + for src, cnt in sorted(by_source.items(), key=lambda x: -x[1])[:20]: + logger.info(" %4d %s", cnt, src) + + logger.info("\nBy hazard category:") + for cat, cnt in sorted(by_hazard.items(), key=lambda x: -x[1]): + logger.info(" %4d %s", cnt, cat) + + # Save + Path(args.output).write_text( + json.dumps(obligations, indent=2, ensure_ascii=False) + ) + logger.info("\nSaved to %s", args.output) + + +if __name__ == "__main__": + main() diff --git a/control-pipeline/scripts/gpre0_add_subtopics.py b/control-pipeline/scripts/gpre0_add_subtopics.py new file mode 100644 index 0000000..b36119f --- /dev/null +++ b/control-pipeline/scripts/gpre0_add_subtopics.py @@ -0,0 +1,289 @@ +#!/usr/bin/env python3 +""" +Add L2 sub-topics to broad tokens. Instead of just "incident", +produces "incident:response", "incident:detection", etc. + +Only processes tokens with >500 controls AND <90% audit accuracy. + +Usage: + python3 /app/scripts/gpre0_add_subtopics.py --dry-run + python3 /app/scripts/gpre0_add_subtopics.py +""" + +import argparse +import json +import logging +import os +import time +from collections import defaultdict +from pathlib import Path + +import httpx +from sqlalchemy import create_engine, text + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" +) +logger = logging.getLogger("gpre0-subtopics") + +DB_URL = os.getenv( + "DATABASE_URL", + "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db", +) +ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") +ANTHROPIC_MODEL = "claude-haiku-4-5-20251001" +ANTHROPIC_URL = "https://api.anthropic.com/v1/messages" +CHECKPOINT_DIR = Path("/tmp/gpre0_subtopic_checkpoints") + +# Tokens that are too broad — need L2 sub-topics +BROAD_TOKENS = { + # Round 1 (already done) + "risk_management", "policy", "audit_logging", "incident", + "access_control", "compliance_audit", "asset_management", + "key_management", "third_party_management", "monitoring", + "financial_reporting", "data_classification", "change_management", + "alerting", "multi_factor_auth", "api_security", + "certificate_management", "human_resources_security", + "training", "data_processing_agreement", "data_processing_register", + "consumer_protection", "input_validation", "vulnerability", + "dpia", "data_breach_notification", "backup", + "supply_chain_due_diligence", "awareness", + "privacy_by_design", "credentials", "logging_configuration", + # Round 2 (remaining large tokens) + "supervisory_authority", "certification", "secure_development", + "product_safety", "personal_data", "data_subject_rights", "consent", + "ai_system", "encryption", "data_retention", "disaster_recovery", + "data_transfer", "aml", "transport_encryption", "network_security", + "physical_security", "medical_device", "patch_management", + "cookie_consent", "video_surveillance", "network_segmentation", + "telecommunications", "privileged_access", "session_management", + "password_policy", "governance", "whistleblowing", "payment_services", + "health_data", "sensitive_data", "ecommerce", "sustainability_reporting", + "critical_infrastructure", "regulatory", +} + +SYSTEM_PROMPT = """Du bist ein Compliance-Spezialist. Jeder Control hat bereits ein Hauptthema (L1 Token). +Deine Aufgabe: Bestimme ein SPEZIFISCHES Sub-Thema (L2) innerhalb des Hauptthemas. + +Das L2 Sub-Thema soll den KONKRETEN Aspekt beschreiben. Verwende kurze, klare englische Bezeichnungen. + +Beispiele: +- L1=incident, Titel="Incident Response Plan erstellen" → L2="response_plan" +- L1=incident, Titel="Sicherheitsvorfälle erkennen" → L2="detection" +- L1=incident, Titel="Recovery nach Vorfall dokumentieren" → L2="recovery" +- L1=incident, Titel="Forensische Analyse durchführen" → L2="forensics" +- L1=risk_management, Titel="Risikobewertung durchführen" → L2="assessment" +- L1=risk_management, Titel="Risikominderungsmaßnahmen umsetzen" → L2="treatment" +- L1=risk_management, Titel="Restrisiko akzeptieren" → L2="acceptance" +- L1=access_control, Titel="Rollenbasierte Zugriffskontrolle" → L2="rbac" +- L1=access_control, Titel="Zugriffsrechte regelmäßig prüfen" → L2="access_review" +- L1=access_control, Titel="Identitätsmanagement implementieren" → L2="identity_management" +- L1=monitoring, Titel="Systemverfügbarkeit überwachen" → L2="availability" +- L1=monitoring, Titel="Sicherheitsereignisse überwachen" → L2="security_events" +- L1=policy, Titel="Datenschutzrichtlinie erstellen" → L2="data_protection" +- L1=policy, Titel="Acceptable Use Policy definieren" → L2="acceptable_use" +- L1=policy, Titel="Passwortrichtlinie festlegen" → L2="password" +- L1=financial_reporting, Titel="Jahresabschluss erstellen" → L2="annual_accounts" +- L1=financial_reporting, Titel="Steuererklärung einreichen" → L2="tax" +- L1=alerting, Titel="Datenpanne an Behörde melden" → L2="breach_notification" +- L1=alerting, Titel="Sicherheitswarnung eskalieren" → L2="escalation" + +REGELN: +- L2 soll 1-3 Wörter sein, snake_case +- L2 soll SPEZIFISCH sein (nicht das L1 wiederholen) +- Verwende konsistente L2-Bezeichnungen für ähnliche Controls + +Antworte NUR als JSON-Array: [{"id":"...","l2":"subtopic"}, ...]""" + + +def call_claude(controls_batch: list[dict]) -> tuple[list[dict], dict]: + """Send batch to Claude for L2 sub-topic assignment.""" + items = [] + for c in controls_batch: + items.append( + f'- id="{c["control_id"]}" ' + f'L1="{c["current_object"]}" ' + f't="{c["title"]}" ' + f'o="{c["objective"][:80]}"' + ) + + prompt = "Bestimme L2 Sub-Topics:\n" + "\n".join(items) + + headers = { + "x-api-key": ANTHROPIC_API_KEY, + "anthropic-version": "2023-06-01", + "content-type": "application/json", + } + payload = { + "model": ANTHROPIC_MODEL, + "max_tokens": 1500, + "temperature": 0.0, + "system": SYSTEM_PROMPT, + "messages": [{"role": "user", "content": prompt}], + } + + try: + resp = httpx.post( + ANTHROPIC_URL, headers=headers, json=payload, timeout=45.0 + ) + resp.raise_for_status() + data = resp.json() + content = data.get("content", [{}])[0].get("text", "") + usage = data.get("usage", {}) + start = content.find("[") + end = content.rfind("]") + 1 + if start >= 0 and end > start: + return json.loads(content[start:end]), usage + return [], usage + except httpx.TimeoutException: + logger.error("TIMEOUT — skipping") + return [], {} + except httpx.HTTPStatusError as e: + if e.response.status_code == 429: + logger.warning("Rate limited — waiting 60s") + time.sleep(60) + else: + logger.error("API error %d", e.response.status_code) + return [], {} + except Exception as e: + logger.error("Failed: %s", e) + return [], {} + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--batch-size", type=int, default=20) + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args() + + engine = create_engine( + DB_URL, connect_args={"options": "-c search_path=compliance,public"} + ) + + # Build LIKE patterns for broad tokens + like_clauses = " OR ".join( + f"cc.generation_metadata->>'merge_group_hint' LIKE '%:{tok}:%'" + for tok in BROAD_TOKENS + ) + + with engine.connect() as c: + rows = c.execute(text(f""" + SELECT cc.id, cc.control_id, cc.title, + COALESCE(cc.objective, '') as objective, + cc.generation_metadata->>'merge_group_hint' as hint + FROM canonical_controls cc + WHERE cc.generation_metadata->>'merge_group_hint' IS NOT NULL + AND cc.release_state NOT IN ('deprecated', 'rejected') + AND ({like_clauses}) + """)).fetchall() + + controls = [] + for uuid, cid, title, objective, hint in rows: + parts = hint.split(":", 2) if hint else [] + obj = parts[1] if len(parts) > 1 else "" + if obj in BROAD_TOKENS: + controls.append({ + "uuid": str(uuid), "control_id": cid, + "title": title or "", "objective": objective or "", + "current_hint": hint, "current_object": obj, + }) + + logger.info("Found %d controls in broad tokens to add L2 sub-topics", len(controls)) + + # Process + total_tagged = 0 + total_skipped = 0 + total_input_tokens = 0 + total_output_tokens = 0 + corrections = [] + l2_stats: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int)) + + for i in range(0, len(controls), args.batch_size): + batch = controls[i:i + args.batch_size] + results, usage = call_claude(batch) + + total_input_tokens += usage.get("input_tokens", 0) + total_output_tokens += usage.get("output_tokens", 0) + + if not results: + total_skipped += len(batch) + continue + + result_map = {r.get("id", ""): r for r in results} + for ctrl in batch: + r = result_map.get(ctrl["control_id"], {}) + l2 = r.get("l2", "") + if not l2: + total_skipped += 1 + continue + + total_tagged += 1 + old_hint = ctrl["current_hint"] + parts = old_hint.split(":", 2) + action = parts[0] if parts else "implement" + l1 = parts[1] if len(parts) > 1 else "unknown" + phase = parts[2] if len(parts) > 2 else "implementation" + # New format: action:L1_L2:phase + new_obj = f"{l1}_{l2}" + new_hint = f"{action}:{new_obj}:{phase}" + corrections.append({ + "uuid": ctrl["uuid"], + "old_hint": old_hint, + "new_hint": new_hint, + }) + l2_stats[l1][l2] += 1 + + processed = min(i + args.batch_size, len(controls)) + if processed % 5000 < args.batch_size or processed >= len(controls): + logger.info( + "Progress: %d/%d (tagged=%d skip=%d)", + processed, len(controls), total_tagged, total_skipped, + ) + + time.sleep(0.3) + + # Report + cost_in = total_input_tokens / 1_000_000 * 0.80 + cost_out = total_output_tokens / 1_000_000 * 4.00 + logger.info("\n" + "=" * 60) + logger.info("SUBTOPIC REPORT") + logger.info("=" * 60) + logger.info("Total: %d | Tagged: %d | Skipped: %d", len(controls), total_tagged, total_skipped) + logger.info("Cost: $%.2f (Haiku)", cost_in + cost_out) + + # Show L2 distribution per L1 + for l1, subs in sorted(l2_stats.items()): + top_subs = sorted(subs.items(), key=lambda x: -x[1])[:10] + logger.info("\n%s (%d unique L2):", l1, len(subs)) + for l2, cnt in top_subs: + logger.info(" %4d %s_%s", cnt, l1, l2) + + # Save corrections + CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True) + corr_file = CHECKPOINT_DIR / "corrections_subtopics.json" + corr_file.write_text(json.dumps(corrections)) + logger.info("\nSaved %d corrections to %s", len(corrections), corr_file) + + if args.dry_run: + logger.info("DRY RUN — not updating DB") + return + + if corrections: + logger.info("Applying %d corrections...", len(corrections)) + with engine.begin() as c: + c.execute(text("SET search_path TO compliance, public")) + for corr in corrections: + c.execute(text(""" + UPDATE canonical_controls + SET generation_metadata = jsonb_set( + generation_metadata, + '{merge_group_hint}', + to_jsonb(CAST(:new_hint AS text)) + ) + WHERE id = CAST(:uuid AS uuid) + """), {"uuid": corr["uuid"], "new_hint": corr["new_hint"]}) + logger.info("Done. %d hints updated.", len(corrections)) + + +if __name__ == "__main__": + main() diff --git a/control-pipeline/scripts/gpre0_apply_corrections.py b/control-pipeline/scripts/gpre0_apply_corrections.py new file mode 100644 index 0000000..68b98ab --- /dev/null +++ b/control-pipeline/scripts/gpre0_apply_corrections.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 +"""Apply saved corrections from JSON file to DB (crash recovery).""" + +import argparse +import json +import logging +import os +from pathlib import Path + +from sqlalchemy import create_engine, text + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") +logger = logging.getLogger("apply-corrections") + +DB_URL = os.getenv("DATABASE_URL", "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db") + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("file", help="Path to corrections JSON file") + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args() + + corrections = json.loads(Path(args.file).read_text()) + logger.info("Loaded %d corrections from %s", len(corrections), args.file) + + if args.dry_run: + for c in corrections[:10]: + logger.info(" %s: %s → %s", c["uuid"][:8], c["old_hint"], c["new_hint"]) + logger.info("DRY RUN — not applying") + return + + engine = create_engine(DB_URL, connect_args={"options": "-c search_path=compliance,public"}) + applied = 0 + with engine.begin() as c: + c.execute(text("SET search_path TO compliance, public")) + for corr in corrections: + c.execute(text(""" + UPDATE canonical_controls + SET generation_metadata = jsonb_set( + generation_metadata, + '{merge_group_hint}', + to_jsonb(CAST(:new_hint AS text)) + ) + WHERE id = CAST(:uuid AS uuid) + """), {"uuid": corr["uuid"], "new_hint": corr["new_hint"]}) + applied += 1 + logger.info("Applied %d corrections.", applied) + + +if __name__ == "__main__": + main() diff --git a/control-pipeline/scripts/gpre0_fix_bad_subtopics.py b/control-pipeline/scripts/gpre0_fix_bad_subtopics.py new file mode 100644 index 0000000..72067fc --- /dev/null +++ b/control-pipeline/scripts/gpre0_fix_bad_subtopics.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python3 +"""Fix bad L2 subtopics: stakeholder_*, escalation fragments, *_approval*, *_documentation.""" + +import json +import logging +import os +import time +from pathlib import Path + +import httpx +from sqlalchemy import create_engine, text + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") +logger = logging.getLogger("fix-subtopics") + +DB_URL = os.getenv("DATABASE_URL", "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db") +ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") +ANTHROPIC_URL = "https://api.anthropic.com/v1/messages" + +SYSTEM_PROMPT = """Du klassifizierst Controls mit einem L1_L2 Token. Das L2 soll den KONKRETEN fachlichen Aspekt beschreiben. + +VERBOTENE L2-Wörter (zu generisch): +- stakeholder (zu vage — WER sind die Stakeholder? WAS wird getan?) +- documentation (ist eine Handlung, kein Thema) +- approval (ist eine Handlung) +- communication (zu vage) + +Stattdessen SPEZIFISCH: +- "stakeholder_notification" bei Behördenmeldung → "authority_reporting" +- "stakeholder_consultation" bei DSFA → "impact_consultation" +- "stakeholder_engagement" bei Training → "participant_selection" +- "escalation_procedure" → "severity_classification" oder "response_plan" +- "access_documentation" → "access_policy" oder "permission_matrix" +- "approval_process" → "authorization_workflow" oder "sign_off" + +L2 = 1-3 Wörter, snake_case, FACHLICH SPEZIFISCH. + +Antworte NUR als JSON-Array: [{"id":"...","token":"L1_L2"}, ...]""" + + +def main(): + engine = create_engine(DB_URL, connect_args={"options": "-c search_path=compliance,public"}) + + with engine.connect() as c: + rows = c.execute(text(""" + SELECT cc.id, cc.control_id, cc.title, + COALESCE(cc.objective, '') as objective, + cc.generation_metadata->>'merge_group_hint' as hint + FROM canonical_controls cc + WHERE cc.release_state NOT IN ('deprecated', 'rejected') + AND cc.generation_metadata->>'merge_group_hint' IS NOT NULL + AND ( + cc.generation_metadata->>'merge_group_hint' LIKE '%stakeholder%' + OR cc.generation_metadata->>'merge_group_hint' LIKE '%_escalation_%' + OR cc.generation_metadata->>'merge_group_hint' LIKE '%_approval_%' + OR cc.generation_metadata->>'merge_group_hint' LIKE '%response_time%' + OR cc.generation_metadata->>'merge_group_hint' LIKE '%machine_re%' + OR cc.generation_metadata->>'merge_group_hint' LIKE '%management_app%' + ) + """)).fetchall() + + controls = [] + for uuid, cid, title, objective, hint in rows: + parts = hint.split(":", 2) if hint else [] + controls.append({ + "uuid": str(uuid), "control_id": cid, + "title": title or "", "objective": objective or "", + "current_hint": hint, + "current_object": parts[1] if len(parts) > 1 else "", + }) + + logger.info("Found %d controls with bad subtopics to fix", len(controls)) + + headers = { + "x-api-key": ANTHROPIC_API_KEY, + "anthropic-version": "2023-06-01", + "content-type": "application/json", + } + + corrections = [] + total_fixed = 0 + batch_size = 20 + + for i in range(0, len(controls), batch_size): + batch = controls[i:i + batch_size] + items = [ + f'- id="{c["control_id"]}" cur="{c["current_object"]}" t="{c["title"]}" o="{c["objective"][:80]}"' + for c in batch + ] + + try: + resp = httpx.post(ANTHROPIC_URL, headers=headers, json={ + "model": "claude-haiku-4-5-20251001", + "max_tokens": 1500, "temperature": 0.0, + "system": SYSTEM_PROMPT, + "messages": [{"role": "user", "content": "Fix:\n" + "\n".join(items)}], + }, timeout=45.0) + resp.raise_for_status() + content = resp.json().get("content", [{}])[0].get("text", "") + start = content.find("[") + end = content.rfind("]") + 1 + results = json.loads(content[start:end]) if start >= 0 else [] + except Exception as e: + logger.error("Batch %d failed: %s", i, e) + continue + + result_map = {r.get("id", ""): r for r in results} + for ctrl in batch: + r = result_map.get(ctrl["control_id"], {}) + new_token = r.get("token", "") + if not new_token or new_token == ctrl["current_object"]: + continue + if "stakeholder" in new_token or "approval" in new_token: + continue # Still bad + + parts = ctrl["current_hint"].split(":", 2) + action = parts[0] if parts else "implement" + phase = parts[2] if len(parts) > 2 else "implementation" + corrections.append({ + "uuid": ctrl["uuid"], + "old_hint": ctrl["current_hint"], + "new_hint": f"{action}:{new_token}:{phase}", + }) + total_fixed += 1 + + if (i + batch_size) % 200 < batch_size: + logger.info("Progress: %d/%d (fixed=%d)", min(i + batch_size, len(controls)), len(controls), total_fixed) + time.sleep(0.3) + + logger.info("Fixed: %d of %d controls", total_fixed, len(controls)) + + # Save + apply + Path("/tmp/corrections_bad_subtopics.json").write_text(json.dumps(corrections)) + + if corrections: + logger.info("Applying %d corrections...", len(corrections)) + with engine.begin() as c: + c.execute(text("SET search_path TO compliance, public")) + for corr in corrections: + c.execute(text(""" + UPDATE canonical_controls + SET generation_metadata = jsonb_set( + generation_metadata, + '{merge_group_hint}', + to_jsonb(CAST(:new_hint AS text)) + ) + WHERE id = CAST(:uuid AS uuid) + """), {"uuid": corr["uuid"], "new_hint": corr["new_hint"]}) + logger.info("Done.") + + +if __name__ == "__main__": + main() diff --git a/control-pipeline/scripts/gpre0_fix_generic_tokens.py b/control-pipeline/scripts/gpre0_fix_generic_tokens.py new file mode 100644 index 0000000..0b104c8 --- /dev/null +++ b/control-pipeline/scripts/gpre0_fix_generic_tokens.py @@ -0,0 +1,284 @@ +#!/usr/bin/env python3 +""" +Fix generic tokens: Re-classify controls that were assigned to +action-based tokens (documentation, procedure, process, etc.) +instead of topic-based tokens. + +Runs sequentially in 5 batches. NO retry on timeout. + +Usage: + python3 /app/scripts/gpre0_fix_generic_tokens.py --dry-run + python3 /app/scripts/gpre0_fix_generic_tokens.py +""" + +import argparse +import json +import logging +import os +import time +from collections import defaultdict +from pathlib import Path + +import httpx +from sqlalchemy import create_engine, text + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" +) +logger = logging.getLogger("gpre0-fix-generic") + +DB_URL = os.getenv( + "DATABASE_URL", + "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db", +) +ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") +ANTHROPIC_MODEL = "claude-haiku-4-5-20251001" +ANTHROPIC_URL = "https://api.anthropic.com/v1/messages" +CHECKPOINT_DIR = Path("/tmp/gpre0_fix_checkpoints") + +# Tokens that are ACTION-based, not TOPIC-based → must be re-classified +FORBIDDEN_TOKENS = { + "documentation", "procedure", "process", + "compliance_reporting", "records_management", +} + +SYSTEM_PROMPT = """Du bist ein Compliance-Klassifizierer. Ordne jeden Control dem THEMA zu, nicht der Handlung. + +KRITISCH: Die Tokens "documentation", "procedure", "process", "compliance_reporting", +"records_management" sind VERBOTEN. Klassifiziere nach dem INHALTLICHEN THEMA. + +Beispiele: +- "Risikobewertung dokumentieren" → risk_management (NICHT documentation) +- "Incident-Verfahren definieren" → incident (NICHT procedure) +- "Verschlüsselungsprozess implementieren" → encryption (NICHT process) +- "Audit-Ergebnisse berichten" → compliance_audit (NICHT compliance_reporting) +- "Datenschutz-Unterlagen verwalten" → personal_data (NICHT records_management) +- "Löschkonzept dokumentieren" → data_retention (NICHT documentation) +- "Zertifizierungsverfahren definieren" → certification (NICHT procedure) +- "Schulungsprozess durchführen" → training (NICHT process) + +ERLAUBTE TOKENS: + +SECURITY: multi_factor_auth, password_policy, credentials, session_management, +privileged_access, access_control, encryption, transport_encryption, +key_management, certificate_management, network_security, network_segmentation, +firewall, vpn, remote_access, monitoring, audit_logging, siem, alerting, +compliance_audit, vulnerability, patch_management, backup, disaster_recovery, +physical_security, secure_development, api_security, input_validation, +container_security, logging_configuration + +DATA_PROTECTION: personal_data, sensitive_data, health_data, consent, +data_subject_rights, data_retention, data_transfer, data_breach_notification, +dpia, data_processing_agreement, privacy_by_design, data_processing_register, +data_classification, cookie_consent, video_surveillance + +GOVERNANCE: policy, training, awareness, incident, risk_management, +third_party_management, change_management, asset_management, +human_resources_security + +REGULATORY: supervisory_authority, certification, product_safety, ai_system, +financial_reporting, aml, whistleblowing, consumer_protection, ecommerce, +telecommunications, medical_device, payment_services, critical_infrastructure, +supply_chain_due_diligence, sustainability_reporting + +Antworte NUR als JSON-Array: [{"id":"...","token":"...","conf":0.9}, ...]""" + + +def call_claude(controls_batch: list[dict]) -> tuple[list[dict], dict]: + """Send batch to Claude. NO retry on timeout.""" + items = [] + for c in controls_batch: + items.append( + f'- id="{c["control_id"]}" ' + f'cur="{c["current_object"]}" ' + f't="{c["title"]}" ' + f'o="{c["objective"][:100]}"' + ) + + prompt = "Klassifiziere nach THEMA (nicht Handlung):\n" + "\n".join(items) + + headers = { + "x-api-key": ANTHROPIC_API_KEY, + "anthropic-version": "2023-06-01", + "content-type": "application/json", + } + payload = { + "model": ANTHROPIC_MODEL, + "max_tokens": 1500, + "temperature": 0.0, + "system": SYSTEM_PROMPT, + "messages": [{"role": "user", "content": prompt}], + } + + try: + resp = httpx.post( + ANTHROPIC_URL, headers=headers, json=payload, timeout=45.0 + ) + resp.raise_for_status() + data = resp.json() + content = data.get("content", [{}])[0].get("text", "") + usage = data.get("usage", {}) + start = content.find("[") + end = content.rfind("]") + 1 + if start >= 0 and end > start: + return json.loads(content[start:end]), usage + return [], usage + except httpx.TimeoutException: + logger.error("TIMEOUT — skipping batch") + return [], {} + except httpx.HTTPStatusError as e: + if e.response.status_code == 429: + logger.warning("Rate limited — waiting 60s") + time.sleep(60) + else: + logger.error("API error %d", e.response.status_code) + return [], {} + except Exception as e: + logger.error("Failed: %s", e) + return [], {} + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--batch-size", type=int, default=20) + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args() + + engine = create_engine( + DB_URL, connect_args={"options": "-c search_path=compliance,public"} + ) + + # Load only controls with forbidden tokens + forbidden_pattern = "|".join( + f":{tok}:" for tok in FORBIDDEN_TOKENS + ) + with engine.connect() as c: + rows = c.execute(text(""" + SELECT cc.id, cc.control_id, cc.title, + COALESCE(cc.objective, '') as objective, + cc.generation_metadata->>'merge_group_hint' as hint + FROM canonical_controls cc + WHERE cc.generation_metadata->>'merge_group_hint' IS NOT NULL + AND cc.release_state NOT IN ('deprecated', 'rejected') + AND ( + cc.generation_metadata->>'merge_group_hint' LIKE '%:documentation:%' + OR cc.generation_metadata->>'merge_group_hint' LIKE '%:procedure:%' + OR cc.generation_metadata->>'merge_group_hint' LIKE '%:process:%' + OR cc.generation_metadata->>'merge_group_hint' LIKE '%:compliance_reporting:%' + OR cc.generation_metadata->>'merge_group_hint' LIKE '%:records_management:%' + ) + """)).fetchall() + + controls = [] + for uuid, cid, title, objective, hint in rows: + parts = hint.split(":", 2) if hint else [] + controls.append({ + "uuid": str(uuid), "control_id": cid, + "title": title or "", "objective": objective or "", + "current_hint": hint, + "current_object": parts[1] if len(parts) > 1 else hint, + }) + + logger.info("Found %d controls with forbidden tokens to re-classify", len(controls)) + + # Process + total_fixed = 0 + total_kept = 0 + total_skipped = 0 + total_input_tokens = 0 + total_output_tokens = 0 + corrections = [] + change_stats: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int)) + + for i in range(0, len(controls), args.batch_size): + batch = controls[i:i + args.batch_size] + results, usage = call_claude(batch) + + total_input_tokens += usage.get("input_tokens", 0) + total_output_tokens += usage.get("output_tokens", 0) + + if not results: + total_skipped += len(batch) + continue + + result_map = {r.get("id", ""): r for r in results} + for ctrl in batch: + r = result_map.get(ctrl["control_id"], {}) + new_token = r.get("token", "") + if not new_token or new_token in FORBIDDEN_TOKENS: + total_kept += 1 + continue + + old_obj = ctrl["current_object"] + if new_token != old_obj: + total_fixed += 1 + parts = ctrl["current_hint"].split(":", 2) + action = parts[0] if parts else "implement" + phase = parts[2] if len(parts) > 2 else "implementation" + corrections.append({ + "uuid": ctrl["uuid"], + "old_hint": ctrl["current_hint"], + "new_hint": f"{action}:{new_token}:{phase}", + }) + change_stats[old_obj][new_token] += 1 + else: + total_kept += 1 + + processed = min(i + args.batch_size, len(controls)) + if processed % 2000 < args.batch_size or processed >= len(controls): + logger.info( + "Progress: %d/%d (fixed=%d kept=%d skip=%d)", + processed, len(controls), total_fixed, total_kept, total_skipped, + ) + + time.sleep(0.3) + + # Report + cost_in = total_input_tokens / 1_000_000 * 0.80 + cost_out = total_output_tokens / 1_000_000 * 4.00 + logger.info("\n" + "=" * 60) + logger.info("GENERIC TOKEN FIX REPORT") + logger.info("=" * 60) + logger.info("Total: %d controls", len(controls)) + logger.info("Fixed: %d", total_fixed) + logger.info("Kept: %d (LLM also chose forbidden → kept as-is)", total_kept) + logger.info("Skipped: %d", total_skipped) + logger.info("Cost: $%.2f (Haiku)", cost_in + cost_out) + + logger.info("\nTop changes:") + flat = [] + for old, news in change_stats.items(): + for new, cnt in news.items(): + flat.append((cnt, old, new)) + for cnt, old, new in sorted(flat, reverse=True)[:30]: + logger.info(" %4d × %s → %s", cnt, old, new) + + # Save corrections + CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True) + corr_file = CHECKPOINT_DIR / "corrections_generic_fix.json" + corr_file.write_text(json.dumps(corrections)) + logger.info("Saved %d corrections to %s", len(corrections), corr_file) + + if args.dry_run: + logger.info("DRY RUN — not updating DB") + return + + if corrections: + logger.info("Applying %d corrections...", len(corrections)) + with engine.begin() as c: + c.execute(text("SET search_path TO compliance, public")) + for corr in corrections: + c.execute(text(""" + UPDATE canonical_controls + SET generation_metadata = jsonb_set( + generation_metadata, + '{merge_group_hint}', + to_jsonb(CAST(:new_hint AS text)) + ) + WHERE id = CAST(:uuid AS uuid) + """), {"uuid": corr["uuid"], "new_hint": corr["new_hint"]}) + logger.info("Done. %d hints corrected.", len(corrections)) + + +if __name__ == "__main__": + main() diff --git a/control-pipeline/scripts/gpre0_run_all.sh b/control-pipeline/scripts/gpre0_run_all.sh new file mode 100644 index 0000000..f9abdc0 --- /dev/null +++ b/control-pipeline/scripts/gpre0_run_all.sh @@ -0,0 +1,37 @@ +#!/bin/bash +# Run all 10 batches sequentially. Safe: if one fails, the rest don't run. +# Each batch saves corrections to JSON before applying to DB. +# +# Usage: bash /app/scripts/gpre0_run_all.sh +# bash /app/scripts/gpre0_run_all.sh 5 # start from batch 5 + +set -e + +START=${1:-1} +TOTAL=10 + +echo "=== Starting from batch $START of $TOTAL ===" + +for i in $(seq $START $TOTAL); do + echo "" + echo "================================================================" + echo " BATCH $i/$TOTAL — $(date)" + echo "================================================================" + + PYTHONPATH=/app python3 /app/scripts/gpre0_validate_hints.py \ + --batch-id $i \ + --total-batches $TOTAL \ + --batch-size 20 + + EXIT_CODE=$? + if [ $EXIT_CODE -ne 0 ]; then + echo "BATCH $i FAILED with exit code $EXIT_CODE" + echo "Resume with: bash /app/scripts/gpre0_run_all.sh $i" + exit $EXIT_CODE + fi + + echo "BATCH $i DONE — $(date)" +done + +echo "" +echo "ALL $TOTAL BATCHES COMPLETE!" diff --git a/control-pipeline/scripts/gpre0_validate_hints.py b/control-pipeline/scripts/gpre0_validate_hints.py new file mode 100644 index 0000000..e81f052 --- /dev/null +++ b/control-pipeline/scripts/gpre0_validate_hints.py @@ -0,0 +1,351 @@ +#!/usr/bin/env python3 +""" +Phase 2: Validate and correct merge_group_hints using Claude Haiku. + +Re-classifies each control's object token against the expanded ontology +(74 canonical tokens). Corrects wrong hints in the DB. + +SAFETY: Split into 4 batches. NEVER retries on timeout (double-billing!). +Writes checkpoint after each API call for safe resume. + +Usage: + python3 /app/scripts/gpre0_validate_hints.py --batch-id 1 --dry-run + python3 /app/scripts/gpre0_validate_hints.py --batch-id 1 + python3 /app/scripts/gpre0_validate_hints.py --batch-id 2 + python3 /app/scripts/gpre0_validate_hints.py --batch-id 3 + python3 /app/scripts/gpre0_validate_hints.py --batch-id 4 +""" + +import argparse +import json +import logging +import os +import time +from collections import defaultdict +from pathlib import Path + +import httpx +from sqlalchemy import create_engine, text + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" +) +logger = logging.getLogger("gpre0-validate") + +DB_URL = os.getenv( + "DATABASE_URL", + "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db", +) +ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") +ANTHROPIC_MODEL = "claude-haiku-4-5-20251001" +ANTHROPIC_URL = "https://api.anthropic.com/v1/messages" +CHECKPOINT_DIR = Path("/tmp/gpre0_checkpoints") + +SYSTEM_PROMPT = """Du bist ein Compliance-Klassifizierer. Ordne jeden Control GENAU EINEM Token zu. + +REGEL: Waehle IMMER den naechstbesten Token aus der Liste. OTHER nur wenn ABSOLUT +kein Token auch nur entfernt passt (<1% der Faelle). Im Zweifel: den breitesten +passenden Token waehlen (z.B. "policy" fuer Governance-Dokumente, "procedure" fuer +Ablauf-Definitionen, "risk_management" fuer Bewertungen). + +TOKENS: + +SECURITY: multi_factor_auth, password_policy, credentials, session_management, +privileged_access, access_control, encryption, transport_encryption, +key_management, certificate_management, network_security, network_segmentation, +firewall, vpn, remote_access, monitoring (NUR Echtzeit-Systemueberwachung), +audit_logging (Protokollierung/Audit Trail), siem, alerting (Meldepflichten), +compliance_audit (externe Pruefungen), vulnerability, patch_management, +backup, disaster_recovery, physical_security, secure_development, +api_security, input_validation, container_security, logging_configuration + +DATA_PROTECTION: personal_data (DSGVO-Verarbeitung), sensitive_data (Art.9), +health_data, consent, data_subject_rights, data_retention, data_transfer, +data_breach_notification, dpia, data_processing_agreement, privacy_by_design, +data_processing_register, data_classification, cookie_consent, video_surveillance + +GOVERNANCE: policy (Richtlinie definieren), procedure (Verfahren definieren), +process (Betriebsprozess ausfuehren), training (Schulung), awareness, +incident (Vorfallsbehandlung), risk_management, third_party_management, +change_management, documentation, records_management, compliance_reporting, +asset_management, human_resources_security + +REGULATORY: supervisory_authority, certification (Zertifizierung/Konformitaet), +product_safety, ai_system, financial_reporting, aml, whistleblowing, +consumer_protection, ecommerce, telecommunications, medical_device, +payment_services, critical_infrastructure, supply_chain_due_diligence, +sustainability_reporting + +ABGRENZUNGEN: +- monitoring = NUR Echtzeit-Systemueberwachung, NICHT Audit/Schulung/Bewertung +- audit_logging = Protokollierung, NICHT externe Pruefung (→ compliance_audit) +- procedure = Verfahren DEFINIEREN, NICHT Vorfaelle behandeln (→ incident) +- personal_data = DSGVO-Verarbeitung, NICHT Zertifizierung (→ certification) +- alerting = Meldepflichten, NICHT Vorfallsbehandlung (→ incident) + +Antworte NUR als JSON-Array: [{"id":"...","token":"...","conf":0.9}, ...] +KEIN weiterer Text. Nur das Array.""" + + +def call_claude(controls_batch: list[dict]) -> tuple[list[dict], dict]: + """Send batch to Claude. NO RETRY on timeout (double-billing risk!).""" + items = [] + for c in controls_batch: + items.append( + f'- id="{c["control_id"]}" ' + f'cur="{c["current_object"]}" ' + f't="{c["title"]}" ' + f'o="{c["objective"][:100]}"' + ) + + prompt = "Klassifiziere:\n" + "\n".join(items) + + headers = { + "x-api-key": ANTHROPIC_API_KEY, + "anthropic-version": "2023-06-01", + "content-type": "application/json", + } + payload = { + "model": ANTHROPIC_MODEL, + "max_tokens": 1500, + "temperature": 0.0, + "system": SYSTEM_PROMPT, + "messages": [{"role": "user", "content": prompt}], + } + + try: + resp = httpx.post( + ANTHROPIC_URL, headers=headers, json=payload, timeout=45.0 + ) + resp.raise_for_status() + data = resp.json() + content = data.get("content", [{}])[0].get("text", "") + usage = data.get("usage", {}) + start = content.find("[") + end = content.rfind("]") + 1 + if start >= 0 and end > start: + return json.loads(content[start:end]), usage + logger.warning("No JSON array in response") + return [], usage + except httpx.TimeoutException: + # CRITICAL: Do NOT retry! Log and skip. + logger.error("TIMEOUT — skipping batch (NOT retrying to avoid double-billing)") + return [], {} + except httpx.HTTPStatusError as e: + if e.response.status_code == 429: + logger.warning("Rate limited — waiting 60s then skipping") + time.sleep(60) + else: + logger.error("API error %d — skipping batch", e.response.status_code) + return [], {} + except Exception as e: + logger.error("Request failed — skipping: %s", e) + return [], {} + + +def load_checkpoint(batch_id: int) -> int: + """Load last processed index for this batch.""" + cp_file = CHECKPOINT_DIR / f"batch_{batch_id}.json" + if cp_file.exists(): + data = json.loads(cp_file.read_text()) + return data.get("last_index", 0) + return 0 + + +def save_checkpoint(batch_id: int, last_index: int, stats: dict): + """Save progress checkpoint.""" + CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True) + cp_file = CHECKPOINT_DIR / f"batch_{batch_id}.json" + cp_file.write_text(json.dumps({ + "batch_id": batch_id, + "last_index": last_index, + **stats, + })) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--batch-id", type=int, required=True) + parser.add_argument("--total-batches", type=int, default=10) + parser.add_argument("--batch-size", type=int, default=20) + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--resume", action="store_true", + help="Resume from checkpoint") + args = parser.parse_args() + + engine = create_engine( + DB_URL, connect_args={"options": "-c search_path=compliance,public"} + ) + + # Load ALL control IDs ordered deterministically, then select quarter + with engine.connect() as c: + all_ids = c.execute(text(""" + SELECT cc.id + FROM canonical_controls cc + WHERE cc.generation_metadata->>'merge_group_hint' IS NOT NULL + AND cc.generation_metadata->>'merge_group_hint' != '' + AND cc.release_state NOT IN ('deprecated', 'rejected') + ORDER BY cc.id + """)).fetchall() + + total = len(all_ids) + chunk = total // args.total_batches + start_idx = (args.batch_id - 1) * chunk + end_idx = total if args.batch_id == args.total_batches else args.batch_id * chunk + batch_ids = [str(r[0]) for r in all_ids[start_idx:end_idx]] + + logger.info("Batch %d/%d: controls %d-%d (%d controls of %d total)", + args.batch_id, args.total_batches, start_idx, end_idx, len(batch_ids), total) + + # Load full data for this batch + id_list = ",".join(f"'{uid}'" for uid in batch_ids) + with engine.connect() as c: + rows = c.execute(text(f""" + SELECT cc.id, cc.control_id, cc.title, + COALESCE(cc.objective, '') as objective, + cc.generation_metadata->>'merge_group_hint' as hint + FROM canonical_controls cc + WHERE cc.id IN ({id_list}) + ORDER BY cc.id + """)).fetchall() + + controls = [] + for uuid, cid, title, objective, hint in rows: + parts = hint.split(":", 2) if hint else [] + controls.append({ + "uuid": str(uuid), "control_id": cid, + "title": title or "", "objective": objective or "", + "current_hint": hint, "current_object": parts[1] if len(parts) > 1 else hint, + }) + + # Resume from checkpoint? + start_from = 0 + if args.resume: + start_from = load_checkpoint(args.batch_id) + if start_from > 0: + logger.info("Resuming from index %d", start_from) + + # Process + total_same = 0 + total_changed = 0 + total_other = 0 + total_skipped = 0 + total_input_tokens = 0 + total_output_tokens = 0 + corrections: list[dict] = [] + change_stats: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int)) + + for i in range(start_from, len(controls), args.batch_size): + batch = controls[i:i + args.batch_size] + results, usage = call_claude(batch) + + total_input_tokens += usage.get("input_tokens", 0) + total_output_tokens += usage.get("output_tokens", 0) + + if not results: + total_skipped += len(batch) + save_checkpoint(args.batch_id, i + args.batch_size, { + "same": total_same, "changed": total_changed, + "other": total_other, "skipped": total_skipped, + }) + continue + + result_map = {r.get("id", ""): r for r in results} + for ctrl in batch: + r = result_map.get(ctrl["control_id"], {}) + new_token = r.get("token", "") + if not new_token: + total_skipped += 1 + continue + + old_obj = ctrl["current_object"] + if new_token == "OTHER": + total_other += 1 + elif new_token == old_obj: + total_same += 1 + else: + total_changed += 1 + parts = ctrl["current_hint"].split(":", 2) + action = parts[0] if parts else "implement" + phase = parts[2] if len(parts) > 2 else "implementation" + corrections.append({ + "uuid": ctrl["uuid"], + "old_hint": ctrl["current_hint"], + "new_hint": f"{action}:{new_token}:{phase}", + }) + change_stats[old_obj][new_token] += 1 + + # Checkpoint every batch + save_checkpoint(args.batch_id, i + args.batch_size, { + "same": total_same, "changed": total_changed, + "other": total_other, "skipped": total_skipped, + }) + + processed = min(i + args.batch_size, len(controls)) + if processed % 1000 < args.batch_size or processed >= len(controls): + logger.info( + "Batch %d: %d/%d (same=%d changed=%d other=%d skip=%d)", + args.batch_id, processed, len(controls), + total_same, total_changed, total_other, total_skipped, + ) + + time.sleep(0.3) + + # Report + cost_in = total_input_tokens / 1_000_000 * 0.80 # Haiku + cost_out = total_output_tokens / 1_000_000 * 4.00 # Haiku + total_cost = cost_in + cost_out + total_proc = total_same + total_changed + total_other + + logger.info("\n" + "=" * 60) + logger.info("BATCH %d REPORT", args.batch_id) + logger.info("=" * 60) + logger.info("Processed: %d | Skipped: %d", total_proc, total_skipped) + logger.info("Same: %d (%.1f%%)", total_same, total_same / max(total_proc, 1) * 100) + logger.info("Changed: %d (%.1f%%)", total_changed, total_changed / max(total_proc, 1) * 100) + logger.info("OTHER: %d (%.1f%%)", total_other, total_other / max(total_proc, 1) * 100) + logger.info("Cost: $%.2f (Haiku)", total_cost) + logger.info("Cost/ctrl: $%.5f", total_cost / max(total_proc, 1)) + + # Top changes + flat = [] + for old, news in change_stats.items(): + for new, cnt in news.items(): + flat.append((cnt, old, new)) + logger.info("\nTop Changes:") + for cnt, old, new in sorted(flat, reverse=True)[:20]: + logger.info(" %4d × %s → %s", cnt, old, new) + + # Always save corrections to file (recovery safety) + corr_file = CHECKPOINT_DIR / f"corrections_batch_{args.batch_id}.json" + if corrections: + CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True) + corr_file.write_text(json.dumps(corrections)) + logger.info("Saved %d corrections to %s", len(corrections), corr_file) + + if args.dry_run: + logger.info("\nDRY RUN — not updating DB") + return + + # Apply corrections in single transaction + if corrections: + logger.info("\nApplying %d corrections...", len(corrections)) + with engine.begin() as c: + c.execute(text("SET search_path TO compliance, public")) + for corr in corrections: + c.execute(text(""" + UPDATE canonical_controls + SET generation_metadata = jsonb_set( + generation_metadata, + '{merge_group_hint}', + to_jsonb(CAST(:new_hint AS text)) + ) + WHERE id = CAST(:uuid AS uuid) + """), {"uuid": corr["uuid"], "new_hint": corr["new_hint"]}) + logger.info("Done. %d hints corrected.", len(corrections)) + else: + logger.info("No corrections needed.") + + +if __name__ == "__main__": + main() diff --git a/control-pipeline/scripts/gpre2_direct_mc.py b/control-pipeline/scripts/gpre2_direct_mc.py new file mode 100644 index 0000000..be19476 --- /dev/null +++ b/control-pipeline/scripts/gpre2_direct_mc.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python3 +""" +G-pre2 v2: Build Master Controls directly from canonical tokens. + +No K-Means needed — Phase 2 already normalized merge_group_hints +to 74 canonical tokens. Each token = one object group. + +Groups controls by (canonical_token, phase) and creates MCs +for tokens with >=2 distinct phases. + +Usage: + python3 /app/scripts/gpre2_direct_mc.py --dry-run + python3 /app/scripts/gpre2_direct_mc.py --min-phases 2 +""" + +import argparse +import json +import logging +import os +from collections import defaultdict + +from sqlalchemy import create_engine, text + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" +) +logger = logging.getLogger("gpre2-direct") + +DB_URL = os.getenv( + "DATABASE_URL", + "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db", +) + +PHASE_ORDER = { + "scope": 0, "definition": 1, "governance": 1, + "design": 2, "implementation": 3, "configuration": 3, + "operation": 4, "training": 4, "monitoring": 5, + "testing": 6, "review": 7, "assessment": 8, "remediation": 8, + "validation": 9, "reporting": 10, "evidence": 11, +} + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--min-phases", type=int, default=2) + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args() + + engine = create_engine( + DB_URL, connect_args={"options": "-c search_path=compliance,public"} + ) + + # Step 1: Load all controls with merge_group_hint + logger.info("Loading controls...") + with engine.connect() as c: + rows = c.execute(text(""" + SELECT id, control_id, + generation_metadata->>'merge_group_hint' AS hint + FROM canonical_controls + WHERE generation_metadata->>'merge_group_hint' IS NOT NULL + AND generation_metadata->>'merge_group_hint' != '' + AND release_state NOT IN ('deprecated', 'rejected') + """)).fetchall() + + logger.info("Loaded %d controls", len(rows)) + + # Step 2: Group by (object_token, phase) + token_phases: dict[str, dict[str, list]] = defaultdict( + lambda: defaultdict(list) + ) + + for uuid, control_id, hint in rows: + parts = hint.split(":", 2) + if len(parts) < 2: + continue + action = parts[0] + obj = parts[1] + phase = parts[2] if len(parts) > 2 else "implementation" + token_phases[obj][phase].append((str(uuid), control_id, action)) + + logger.info("Found %d unique object tokens", len(token_phases)) + + # Step 3: Create Master Controls + master_controls = [] + master_members = [] + + for token, phases in token_phases.items(): + if len(phases) < args.min_phases: + continue + + sorted_phases = sorted( + phases.keys(), key=lambda p: PHASE_ORDER.get(p, 99) + ) + phase_counts = {p: len(ctrls) for p, ctrls in phases.items()} + total = sum(phase_counts.values()) + + master_controls.append({ + "canonical_name": token, + "phases_covered": json.dumps(sorted_phases), + "phase_control_count": json.dumps(phase_counts), + "total_controls": total, + }) + + for phase, controls in phases.items(): + for ctrl_uuid, ctrl_id, action in controls: + master_members.append({ + "canonical_name": token, + "control_uuid": ctrl_uuid, + "phase": phase, + "action": action, + }) + + logger.info( + "Created %d Master Controls with %d members (min %d phases)", + len(master_controls), len(master_members), args.min_phases, + ) + + # Stats + if master_controls: + counts = [mc["total_controls"] for mc in master_controls] + phases_per = [ + len(json.loads(mc["phases_covered"])) for mc in master_controls + ] + logger.info(" Avg controls/MC: %.1f", sum(counts) / len(counts)) + logger.info(" Max controls/MC: %d", max(counts)) + logger.info(" Avg phases/MC: %.1f", sum(phases_per) / len(phases_per)) + logger.info(" Max phases/MC: %d", max(phases_per)) + + # Size distribution + logger.info("\n Size distribution:") + logger.info(" ≤10: %d", sum(1 for c in counts if c <= 10)) + logger.info(" 11-50: %d", sum(1 for c in counts if 11 <= c <= 50)) + logger.info(" 51-200: %d", sum(1 for c in counts if 51 <= c <= 200)) + logger.info(" 201-500: %d", sum(1 for c in counts if 201 <= c <= 500)) + logger.info(" 501-2K: %d", sum(1 for c in counts if 501 <= c <= 2000)) + logger.info(" >2K: %d", sum(1 for c in counts if c > 2000)) + + # Top 15 + top = sorted(master_controls, key=lambda x: -x["total_controls"])[:15] + logger.info("\n Top 15 Master Controls:") + for mc in top: + logger.info( + " %6d %s (%d phases)", + mc["total_controls"], + mc["canonical_name"], + len(json.loads(mc["phases_covered"])), + ) + + if args.dry_run: + logger.info("\nDRY RUN — not writing to DB") + return + + # Step 4: Write to DB + with engine.begin() as c: + c.execute(text("SET search_path TO compliance, public")) + c.execute(text("DELETE FROM master_control_members")) + c.execute(text("DELETE FROM master_controls")) + + # Get next object_group_id + max_gid = c.execute( + text("SELECT COALESCE(MAX(group_id), 0) FROM object_groups") + ).scalar() + next_gid = max_gid + 1 + + mc_uuids = {} + for mc in master_controls: + gid = next_gid + next_gid += 1 + mc_id = f"MC-{gid}" + + c.execute(text(""" + INSERT INTO master_controls + (master_control_id, object_group_id, canonical_name, + phases_covered, phase_control_count, total_controls) + VALUES (:mcid, :gid, :name, + CAST(:phases AS jsonb), + CAST(:pcounts AS jsonb), :total) + """), { + "mcid": mc_id, "gid": gid, + "name": mc["canonical_name"], + "phases": mc["phases_covered"], + "pcounts": mc["phase_control_count"], + "total": mc["total_controls"], + }) + + mc_uuid = c.execute(text( + "SELECT id FROM master_controls WHERE master_control_id = :mcid" + ), {"mcid": mc_id}).scalar() + mc_uuids[mc["canonical_name"]] = str(mc_uuid) + + # Insert members + mem_count = 0 + for mem in master_members: + mc_uuid = mc_uuids.get(mem["canonical_name"]) + if not mc_uuid: + continue + c.execute(text(""" + INSERT INTO master_control_members + (master_control_uuid, control_uuid, phase, action) + VALUES (CAST(:mc AS uuid), CAST(:ctrl AS uuid), + :phase, :action) + """), { + "mc": mc_uuid, + "ctrl": mem["control_uuid"], + "phase": mem["phase"], + "action": mem["action"], + }) + mem_count += 1 + + logger.info("Wrote %d MCs + %d members to DB", len(master_controls), mem_count) + + +if __name__ == "__main__": + main() diff --git a/control-pipeline/scripts/gpre3_regulation_split.py b/control-pipeline/scripts/gpre3_regulation_split.py new file mode 100644 index 0000000..36a4dc1 --- /dev/null +++ b/control-pipeline/scripts/gpre3_regulation_split.py @@ -0,0 +1,298 @@ +#!/usr/bin/env python3 +""" +G-pre3: Split large Master Controls by regulation source. + +For each MC with >200 controls: +1. Load member controls with parent's source_citation->>'source' +2. Group by regulation source +3. Sources with >= MIN_SOURCE_SIZE → new sub-MC +4. Small sources → merge into "mixed" bucket +5. UNKNOWN (no source_citation) → sub-cluster by embedding if >MAX_MC +6. Delete original large MC, create new sub-MCs + +Usage: + python3 /app/scripts/gpre3_regulation_split.py --dry-run + python3 /app/scripts/gpre3_regulation_split.py --min-source 15 --max-mc 100 +""" + +import argparse +import json +import logging +import os +import re +from collections import defaultdict + +from sqlalchemy import create_engine, text + +from services.embedding_utils import subcluster_controls + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" +) +logger = logging.getLogger("gpre3") + +DB_URL = os.getenv( + "DATABASE_URL", + "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db", +) + +# ── Source key normalization ──────────────────────────────────────── +# fmt: off +_SOURCE_SHORT: dict[str, str] = { + "DSGVO (EU) 2016/679": "dsgvo", "NIS2-Richtlinie (EU) 2022/2555": "nis2", + "KI-Verordnung (EU) 2024/1689": "ai_act", "Cyber Resilience Act (CRA)": "cra", + "Digital Services Act (DSA)": "dsa", "Digital Markets Act (DMA)": "dma", + "Digital Operational Resilience Act": "dora", "Data Governance Act (DGA)": "dga", + "Data Act": "data_act", "Maschinenverordnung (EU) 2023/1230": "machinery_reg", + "Medizinprodukteverordnung (EU) 2017/745 (MDR)": "mdr", + "European Health Data Space": "ehds", "European Accessibility Act": "eaa", + "EU Cybersecurity Act": "eu_csa", "EU Blue Guide 2022": "eu_blue_guide", + "EU-US Data Privacy Framework": "eu_us_dpf", "Markets in Crypto-Assets (MiCA)": "mica", + "Standardvertragsklauseln (SCC)": "scc", "ePrivacy-Richtlinie": "eprivacy", + "Batterieverordnung (EU) 2023/1542": "battery_reg", + "Bundesdatenschutzgesetz (BDSG)": "bdsg", + "BSI-Gesetz (BSIG 2025, NIS2-Umsetzung)": "bsig", + "BSI-Kritisverordnung (BSI-KritisV)": "bsi_kritisv", + "Geldwaeschegesetz (GwG)": "gwg", "Hinweisgeberschutzgesetz (HinSchG)": "hinschg", + "Lieferkettensorgfaltspflichtengesetz (LkSG)": "lksg", + "KRITIS-Dachgesetz (KRITISDachG)": "kritisdachg", + "NIST SP 800-53 Rev. 5": "nist_800_53", "NIST Cybersecurity Framework 2.0": "nist_csf", + "NIST Privacy Framework 1.0": "nist_privacy", + "NIST SP 800-207 (Zero Trust)": "nist_zero_trust", + "NIST SP 800-218 (SSDF)": "nist_ssdf", "NIST SP 800-63-3": "nist_800_63", + "NIST AI Risk Management Framework": "nist_ai_rmf", + "NISTIR 8259A IoT Security": "nist_iot", + "OWASP Top 10 (2021)": "owasp_top10", "OWASP API Security Top 10 (2023)": "owasp_api", + "OWASP ASVS 4.0": "owasp_asvs", "OWASP SAMM 2.0": "owasp_samm", + "OWASP MASVS 2.0": "owasp_masvs", "OWASP Mobile Top 10": "owasp_mobile", + "ENISA": "enisa", "TDDDG": "tdddg", "TKG": "tkg", "TMG": "tmg", + "BGB": "bgb", "UWG": "uwg", "UrhG": "urhg", + "BAIT (BaFin 2024)": "bait", "VAIT (BaFin 2022)": "vait", + "AML-Verordnung": "aml_reg", "Zahlungsdiensterichtlinie 2": "psd2", + "Telekommunikationsgesetz Oesterreich": "at_tkg", + "Österreichisches Datenschutzgesetz (DSG)": "at_dsg", + "Allgemeines Gleichbehandlungsgesetz (AGG)": "agg", + "Aktiengesetz (AktG)": "aktg", "Handelsgesetzbuch (HGB)": "hgb", + "GmbH-Gesetz (GmbHG)": "gmbhg", "Insolvenzordnung (InsO)": "inso", + "Gewerbeordnung (GewO)": "gewo", "Abgabenordnung (AO)": "ao", +} +# fmt: on + + +def source_to_key(source: str) -> str: + """Normalize regulation source name to a short slug key.""" + if source in _SOURCE_SHORT: + return _SOURCE_SHORT[source] + s = source.lower() + s = re.sub(r"\(.*?\)", "", s) + s = re.sub(r"[^a-z0-9äöüß]+", "_", s) + s = re.sub(r"_+", "_", s).strip("_") + return s[:40] if s else "unknown" + + +# ── Main ─────────────────────────────────────────────────────────── +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--min-source", type=int, default=15, + help="Min controls per source for own sub-MC") + parser.add_argument("--max-mc", type=int, default=100, + help="Max controls per sub-MC before sub-clustering") + parser.add_argument("--threshold", type=int, default=200, + help="Only split MCs with more than N controls") + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args() + + engine = create_engine( + DB_URL, connect_args={"options": "-c search_path=compliance,public"} + ) + + # Step 1: Find large master controls + with engine.connect() as c: + large_mcs = c.execute(text(""" + SELECT mc.id, mc.master_control_id, mc.object_group_id, + mc.canonical_name, mc.total_controls + FROM master_controls mc + WHERE mc.total_controls > :threshold + ORDER BY mc.total_controls DESC + """), {"threshold": args.threshold}).fetchall() + + logger.info("Found %d MCs with >%d controls", len(large_mcs), args.threshold) + if not large_mcs: + return + + # Step 2: Build split plans + all_splits = [] + for mc_uuid, mc_id, og_id, canonical, total in large_mcs: + plan = _build_split_plan(engine, mc_uuid, mc_id, og_id, canonical, total, args) + all_splits.append(plan) + + total_new = sum(len(sp["sub_groups"]) for sp in all_splits) + total_covered = sum( + sum(len(sg["controls"]) for sg in sp["sub_groups"]) for sp in all_splits + ) + logger.info("SUMMARY: %d large MCs → %d sub-MCs (%d controls)", len(all_splits), total_new, total_covered) + + if args.dry_run: + logger.info("DRY RUN — not writing to DB") + return + + _write_splits(engine, all_splits) + + +def _build_split_plan(engine, mc_uuid, mc_id, og_id, canonical, total, args) -> dict: + """Build a regulation-source split plan for one large MC.""" + logger.info("\n━━━ %s: %s (%d controls) ━━━", mc_id, canonical, total) + + with engine.connect() as c: + members = c.execute(text(""" + SELECT mcm.control_uuid, mcm.phase, mcm.action, + cc.control_id, cc.title, + COALESCE(pc.source_citation->>'source', 'UNKNOWN') AS src + FROM master_control_members mcm + JOIN canonical_controls cc ON cc.id = mcm.control_uuid + LEFT JOIN canonical_controls pc ON pc.id = cc.parent_control_uuid + WHERE mcm.master_control_uuid = CAST(:mc_uuid AS uuid) + """), {"mc_uuid": str(mc_uuid)}).fetchall() + + by_source: dict[str, list[dict]] = defaultdict(list) + for ctrl_uuid, phase, action, cid, title, src in members: + by_source[src].append({ + "control_uuid": str(ctrl_uuid), "phase": phase, + "action": action, "control_id": cid, "title": title, + }) + + sorted_sources = sorted(by_source.items(), key=lambda x: -len(x[1])) + for src, ctrls in sorted_sources[:8]: + logger.info(" %4d %s", len(ctrls), src) + if len(sorted_sources) > 8: + logger.info(" ... +%d more sources", len(sorted_sources) - 8) + + plan = {"mc_uuid": str(mc_uuid), "mc_id": mc_id, "og_id": og_id, + "canonical": canonical, "total": total, "sub_groups": []} + + own_mc_sources = [] + mixed_controls = [] + for src, ctrls in sorted_sources: + if src == "UNKNOWN": + continue + if len(ctrls) >= args.min_source: + own_mc_sources.append((src, ctrls)) + else: + mixed_controls.extend(ctrls) + + unknown_controls = by_source.get("UNKNOWN", []) + + # (a) Named regulation sub-MCs + for src, ctrls in own_mc_sources: + key = source_to_key(src) + name = f"{canonical}_{key}" + _add_subgroups(plan, name, src, ctrls, args.max_mc) + + # (b) Mixed small-source bucket + if mixed_controls: + _add_subgroups(plan, f"{canonical}_mixed", "mixed", mixed_controls, args.max_mc) + + # (c) UNKNOWN bucket + if unknown_controls: + _add_subgroups(plan, f"{canonical}_general", "general", unknown_controls, args.max_mc) + + logger.info(" → %d sub-groups:", len(plan["sub_groups"])) + for sg in sorted(plan["sub_groups"], key=lambda x: -len(x["controls"])): + logger.info(" %4d %s", len(sg["controls"]), sg["name"]) + + return plan + + +def _add_subgroups(plan: dict, name: str, source: str, + controls: list[dict], max_mc: int): + """Add controls as one or more sub-groups to the plan.""" + if len(controls) <= max_mc: + plan["sub_groups"].append({"name": name, "source": source, "controls": controls}) + else: + clusters = subcluster_controls(controls, max_mc) + for i, cluster in enumerate(clusters): + sub_name = f"{name}_{i+1}" if len(clusters) > 1 else name + plan["sub_groups"].append({"name": sub_name, "source": source, "controls": cluster}) + + +def _write_splits(engine, splits: list[dict]): + """Apply split plan: delete old MCs, create new object_groups + MCs.""" + with engine.begin() as c: + c.execute(text("SET search_path TO compliance, public")) + max_gid = c.execute( + text("SELECT COALESCE(MAX(group_id), 0) FROM object_groups") + ).scalar() + next_gid = max_gid + 1 + total_mc = 0 + total_mem = 0 + + for sp in splits: + c.execute(text( + "DELETE FROM master_control_members " + "WHERE master_control_uuid = CAST(:u AS uuid)" + ), {"u": sp["mc_uuid"]}) + c.execute(text( + "DELETE FROM master_controls WHERE id = CAST(:u AS uuid)" + ), {"u": sp["mc_uuid"]}) + logger.info("Deleted %s (%s)", sp["mc_id"], sp["canonical"]) + + for sg in sp["sub_groups"]: + if not sg["controls"]: + continue + gid = next_gid + next_gid += 1 + + members_list = list({ctrl["control_id"] for ctrl in sg["controls"]}) + c.execute(text(""" + INSERT INTO object_groups + (group_id, canonical_name, member_count, members, top_controls_count) + VALUES (:gid, :name, :cnt, CAST(:members AS jsonb), 0) + """), {"gid": gid, "name": sg["name"], "cnt": len(members_list), + "members": json.dumps(members_list)}) + + by_phase: dict[str, list[dict]] = defaultdict(list) + for ctrl in sg["controls"]: + by_phase[ctrl["phase"]].append(ctrl) + + sorted_phases = sorted(by_phase.keys()) + phase_counts = {p: len(v) for p, v in by_phase.items()} + mc_id = f"MC-{gid}" + + c.execute(text(""" + INSERT INTO master_controls + (master_control_id, object_group_id, canonical_name, + phases_covered, phase_control_count, total_controls) + VALUES (:mcid, :gid, :name, + CAST(:phases AS jsonb), CAST(:pcounts AS jsonb), :total) + """), {"mcid": mc_id, "gid": gid, "name": sg["name"], + "phases": json.dumps(sorted_phases), + "pcounts": json.dumps(phase_counts), + "total": sum(phase_counts.values())}) + + mc_uuid = c.execute(text( + "SELECT id FROM master_controls WHERE master_control_id = :mcid" + ), {"mcid": mc_id}).scalar() + + for ctrl in sg["controls"]: + c.execute(text(""" + INSERT INTO master_control_members + (master_control_uuid, control_uuid, phase, action) + VALUES (CAST(:mc AS uuid), CAST(:ctrl AS uuid), :phase, :action) + """), {"mc": str(mc_uuid), "ctrl": ctrl["control_uuid"], + "phase": ctrl["phase"], "action": ctrl["action"]}) + total_mem += 1 + total_mc += 1 + + logger.info("Created %d new MCs with %d members", total_mc, total_mem) + + with engine.connect() as c: + stats = c.execute(text(""" + SELECT count(*), count(CASE WHEN total_controls > 200 THEN 1 END), + AVG(total_controls)::int + FROM compliance.master_controls + """)).fetchone() + logger.info("Final: %d MCs, %d still >200, avg %d controls/MC", stats[0], stats[1], stats[2]) + + +if __name__ == "__main__": + main() diff --git a/control-pipeline/scripts/gpre_quality_audit.py b/control-pipeline/scripts/gpre_quality_audit.py new file mode 100644 index 0000000..de00f46 --- /dev/null +++ b/control-pipeline/scripts/gpre_quality_audit.py @@ -0,0 +1,310 @@ +#!/usr/bin/env python3 +""" +Phase 0: Quality Audit for Master Control Assignments. + +Uses Claude Sonnet to validate whether controls are correctly assigned +to their Master Controls. Samples controls from large and small MCs. + +Usage: + python3 /app/scripts/gpre_quality_audit.py + python3 /app/scripts/gpre_quality_audit.py --large-sample 50 --small-sample 10 + python3 /app/scripts/gpre_quality_audit.py --mc MC-8292 # single MC +""" + +import argparse +import json +import logging +import os +import random +import time +from collections import defaultdict + +import httpx +from sqlalchemy import create_engine, text + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" +) +logger = logging.getLogger("quality-audit") + +DB_URL = os.getenv( + "DATABASE_URL", + "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db", +) +ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") +ANTHROPIC_MODEL = os.getenv("AUDIT_MODEL", "claude-sonnet-4-20250514") +ANTHROPIC_URL = "https://api.anthropic.com/v1/messages" + +SYSTEM_PROMPT = """Du bist ein Compliance-Experte der prüft ob Controls korrekt zu Master Controls zugeordnet sind. + +Für jeden Control beantworte: +1. MATCH: Gehört dieser Control thematisch zum Master Control Topic? +2. CONFIDENCE: Wie sicher bist du? (0.0-1.0) +3. REASON: Kurze Begründung (max 1 Satz) +4. SUGGESTED_TOPIC: Falls MATCH=false, welches Topic wäre korrekt? + +Wichtige Unterscheidungen: +- "monitoring" = kontinuierliche Überwachung, Alerting, Log-Analyse +- "training" = Schulung, Awareness, Lernmaterialien +- "personal_data" = personenbezogene Daten, DSGVO-Betroffenenrechte +- "procedure" = Verfahren, Prozesse (aber NICHT wenn es spezifisch um Incidents geht) +- "incident" = Sicherheitsvorfälle, Breach Notification, Recovery +- "policy" = Richtlinien, Regelwerke, Governance-Dokumente +- "encryption" = Verschlüsselung, Kryptografie, Key Management +- "audit_logging" = Protokollierung, Audit Trail, Nachvollziehbarkeit + +Antworte NUR als JSON-Array, ein Objekt pro Control.""" + + +def call_claude(controls_batch: list[dict], mc_topic: str) -> list[dict]: + """Send a batch of controls to Claude for validation.""" + items = [] + for c in controls_batch: + items.append( + f"- Control '{c['control_id']}': " + f"Titel=\"{c['title']}\", " + f"Objective=\"{c['objective'][:150]}...\", " + f"Phase={c['phase']}, Action={c['action']}" + ) + + prompt = ( + f"Master Control Topic: \"{mc_topic}\"\n\n" + f"Prüfe diese {len(controls_batch)} Controls:\n\n" + + "\n".join(items) + + "\n\nAntwort als JSON-Array mit Feldern: " + "control_id, match (bool), confidence (float), reason (str), " + "suggested_topic (str, nur wenn match=false)." + ) + + headers = { + "x-api-key": ANTHROPIC_API_KEY, + "anthropic-version": "2023-06-01", + "content-type": "application/json", + } + payload = { + "model": ANTHROPIC_MODEL, + "max_tokens": 2048, + "temperature": 0.1, + "system": SYSTEM_PROMPT, + "messages": [{"role": "user", "content": prompt}], + } + + for attempt in range(3): + try: + resp = httpx.post( + ANTHROPIC_URL, + headers=headers, + json=payload, + timeout=60.0, + ) + resp.raise_for_status() + data = resp.json() + content = data.get("content", [{}])[0].get("text", "") + usage = data.get("usage", {}) + + # Parse JSON from response + start = content.find("[") + end = content.rfind("]") + 1 + if start >= 0 and end > start: + results = json.loads(content[start:end]) + return results, usage + logger.warning("No JSON array in response: %s", content[:200]) + return [], usage + except httpx.HTTPStatusError as e: + if e.response.status_code == 429: + wait = 30 * (attempt + 1) + logger.warning("Rate limited, waiting %ds...", wait) + time.sleep(wait) + else: + logger.error("API error: %s", e) + return [], {} + except Exception as e: + logger.error("Request failed (attempt %d): %s", attempt + 1, e) + if attempt < 2: + time.sleep(5) + return [], {} + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--large-sample", type=int, default=50, + help="Controls to sample per large MC") + parser.add_argument("--small-sample", type=int, default=10, + help="Controls to sample per small MC") + parser.add_argument("--small-mc-count", type=int, default=50, + help="Number of small MCs to audit") + parser.add_argument("--mc", type=str, default=None, + help="Audit a single MC by ID (e.g., MC-8292)") + parser.add_argument("--batch-size", type=int, default=10, + help="Controls per API call") + args = parser.parse_args() + + engine = create_engine( + DB_URL, connect_args={"options": "-c search_path=compliance,public"} + ) + + # Load MCs to audit + with engine.connect() as c: + if args.mc: + mcs = c.execute(text(""" + SELECT id, master_control_id, canonical_name, total_controls + FROM master_controls WHERE master_control_id = :mc + """), {"mc": args.mc}).fetchall() + else: + # Large MCs (>200) + random small MCs + large = c.execute(text(""" + SELECT id, master_control_id, canonical_name, total_controls + FROM master_controls WHERE total_controls > 200 + ORDER BY total_controls DESC + """)).fetchall() + + small = c.execute(text(""" + SELECT id, master_control_id, canonical_name, total_controls + FROM master_controls WHERE total_controls BETWEEN 10 AND 200 + ORDER BY RANDOM() LIMIT :cnt + """), {"cnt": args.small_mc_count}).fetchall() + + mcs = list(large) + list(small) + + logger.info("Auditing %d Master Controls", len(mcs)) + + # Results tracking + total_checked = 0 + total_match = 0 + total_mismatch = 0 + total_input_tokens = 0 + total_output_tokens = 0 + mc_results: dict[str, dict] = {} + all_mismatches: list[dict] = [] + + for mc_uuid, mc_id, canonical, total in mcs: + is_large = total > 200 + sample_size = args.large_sample if is_large else args.small_sample + + # Sample controls + with engine.connect() as c: + controls = c.execute(text(""" + SELECT mcm.control_uuid, mcm.phase, mcm.action, + cc.control_id, cc.title, + COALESCE(cc.objective, '') as objective + FROM master_control_members mcm + JOIN canonical_controls cc ON cc.id = mcm.control_uuid + WHERE mcm.master_control_uuid = CAST(:mc AS uuid) + ORDER BY RANDOM() + LIMIT :n + """), {"mc": str(mc_uuid), "n": sample_size}).fetchall() + + if not controls: + continue + + control_dicts = [ + {"control_uuid": str(r[0]), "phase": r[1], "action": r[2], + "control_id": r[3], "title": r[4] or "", "objective": r[5] or ""} + for r in controls + ] + + logger.info("\n%s: %s (%d total, sampling %d)", + mc_id, canonical, total, len(control_dicts)) + + mc_match = 0 + mc_mismatch = 0 + + # Process in batches + for i in range(0, len(control_dicts), args.batch_size): + batch = control_dicts[i:i + args.batch_size] + results, usage = call_claude(batch, canonical) + + total_input_tokens += usage.get("input_tokens", 0) + total_output_tokens += usage.get("output_tokens", 0) + + for r in results: + if r.get("match", True): + mc_match += 1 + total_match += 1 + else: + mc_mismatch += 1 + total_mismatch += 1 + mismatch = { + "mc_id": mc_id, + "mc_topic": canonical, + "control_id": r.get("control_id", "?"), + "confidence": r.get("confidence", 0), + "reason": r.get("reason", ""), + "suggested_topic": r.get("suggested_topic", ""), + } + all_mismatches.append(mismatch) + + total_checked += len(results) + + # Rate limit + time.sleep(1) + + accuracy = mc_match / (mc_match + mc_mismatch) if (mc_match + mc_mismatch) > 0 else 1.0 + mc_results[mc_id] = { + "canonical": canonical, "total": total, + "checked": mc_match + mc_mismatch, + "match": mc_match, "mismatch": mc_mismatch, + "accuracy": accuracy, + } + logger.info(" → %d/%d correct (%.1f%%)", + mc_match, mc_match + mc_mismatch, accuracy * 100) + + # Final report + _print_report(mc_results, all_mismatches, total_checked, total_match, + total_mismatch, total_input_tokens, total_output_tokens) + + +def _print_report(mc_results, mismatches, checked, match, mismatch, + input_tok, output_tok): + """Print the quality audit report.""" + logger.info("\n" + "=" * 70) + logger.info("QUALITY AUDIT REPORT") + logger.info("=" * 70) + logger.info("Total controls checked: %d", checked) + logger.info("Correct assignments: %d (%.1f%%)", + match, match / max(checked, 1) * 100) + logger.info("Wrong assignments: %d (%.1f%%)", + mismatch, mismatch / max(checked, 1) * 100) + + # Cost estimate + cost_input = input_tok / 1_000_000 * 3.0 # Sonnet input: $3/MTok + cost_output = output_tok / 1_000_000 * 15.0 # Sonnet output: $15/MTok + logger.info("\nAPI Usage: %d input + %d output tokens", + input_tok, output_tok) + logger.info("Estimated cost: $%.2f", cost_input + cost_output) + + # Per-MC breakdown (worst first) + logger.info("\n--- Per-MC Accuracy (worst first) ---") + sorted_mcs = sorted(mc_results.values(), key=lambda x: x["accuracy"]) + for mc in sorted_mcs: + flag = "❌" if mc["accuracy"] < 0.9 else "⚠️" if mc["accuracy"] < 0.95 else "✅" + logger.info(" %s %s (%s): %d/%d = %.1f%% [total: %d]", + flag, mc["canonical"][:30].ljust(30), + "large" if mc["total"] > 200 else "small", + mc["match"], mc["checked"], + mc["accuracy"] * 100, mc["total"]) + + # Top mismatches + if mismatches: + logger.info("\n--- Mismatches (all %d) ---", len(mismatches)) + for m in sorted(mismatches, key=lambda x: -x.get("confidence", 0)): + logger.info(" %s in %s (%s) → should be '%s': %s", + m["control_id"], m["mc_id"], m["mc_topic"], + m["suggested_topic"], m["reason"]) + + # Size-class breakdown + large_mcs = [m for m in mc_results.values() if m["total"] > 200] + small_mcs = [m for m in mc_results.values() if m["total"] <= 200] + + if large_mcs: + lg_acc = sum(m["match"] for m in large_mcs) / max(sum(m["checked"] for m in large_mcs), 1) + logger.info("\nLarge MCs (>200): %.1f%% accuracy (%d MCs)", + lg_acc * 100, len(large_mcs)) + if small_mcs: + sm_acc = sum(m["match"] for m in small_mcs) / max(sum(m["checked"] for m in small_mcs), 1) + logger.info("Small MCs (≤200): %.1f%% accuracy (%d MCs)", + sm_acc * 100, len(small_mcs)) + + +if __name__ == "__main__": + main() diff --git a/control-pipeline/services/decomposition_pass.py b/control-pipeline/services/decomposition_pass.py index 8f7092e..94bc46a 100644 --- a/control-pipeline/services/decomposition_pass.py +++ b/control-pipeline/services/decomposition_pass.py @@ -460,12 +460,50 @@ WICHTIGE REGELN: 7. MERGE-KEY: Erzeuge im JSON-Output ein zusaetzliches Feld "merge_key" mit dem Format: "action_type:normalized_object:control_phase" + + WICHTIG: Waehle normalized_object NUR aus dieser Liste kanonischer Tokens: + SECURITY: multi_factor_auth, password_policy, credentials, session_management, + privileged_access, access_control, encryption, transport_encryption, + key_management, certificate_management, network_security, network_segmentation, + firewall, vpn, remote_access, monitoring, audit_logging, siem, alerting, + compliance_audit, vulnerability, patch_management, backup, disaster_recovery, + physical_security, secure_development, api_security, input_validation, + container_security, logging_configuration + DATA_PROTECTION: personal_data, sensitive_data, health_data, consent, + data_subject_rights, data_retention, data_transfer, data_breach_notification, + dpia, data_processing_agreement, privacy_by_design, data_processing_register, + data_classification, cookie_consent, video_surveillance + GOVERNANCE: policy, procedure, process, training, awareness, incident, + risk_management, third_party_management, change_management, documentation, + records_management, compliance_reporting, asset_management, + human_resources_security + REGULATORY: supervisory_authority, certification, product_safety, ai_system, + financial_reporting, aml, whistleblowing, consumer_protection, ecommerce, + telecommunications, medical_device, payment_services, critical_infrastructure, + supply_chain_due_diligence, sustainability_reporting + + Wenn KEIN Token passt: "OTHER:kurzbeschreibung" (z.B. "OTHER:battery_recycling") + + ABGRENZUNGEN (haeufige Fehler vermeiden!): + - monitoring = NUR kontinuierliche Echtzeit-Ueberwachung von Systemen + - audit_logging = Protokollierung, Audit Trail, Nachvollziehbarkeit + - compliance_audit = externe Pruefungen, Zertifizierungsaudits + - training = Schulungen DURCHFUEHREN (nicht "ueberwachen") + - procedure = Verfahren DEFINIEREN (nicht Incident-Behandlung) + - incident = Sicherheitsvorfaelle BEHANDELN + - alerting = Meldepflichten und Benachrichtigungen + - personal_data = DSGVO-Verarbeitungsgrundsaetze (nicht Zertifizierung!) + - certification = Zertifizierung/Konformitaet (nicht Datenschutz) + Beispiele: - - "implement:api_rate_limiting:implementation" - - "define:access_control_policy:definition" - - "monitor:third_party_vulnerabilities:monitoring" - - "test:authentication_mechanism:testing" + - "implement:multi_factor_auth:implementation" + - "define:access_control:definition" + - "monitor:network_security:monitoring" + - "test:vulnerability:testing" - "report:supervisory_authority:reporting" + - "implement:audit_logging:implementation" (NICHT monitoring!) + - "define:incident:definition" (Incident-Verfahren, NICHT procedure!) + - "train:training:operation" (Schulung, NICHT monitoring!) 8. APPLICABILITY + SCANNER: Bestimme fuer jedes Control: - applicability: Unter welchen Bedingungen gilt dieses Control? @@ -2472,6 +2510,81 @@ def _ensure_list(val) -> list: return [] +# Canonical object tokens from object_ontology (loaded once) +_CANONICAL_OBJECTS: set[str] | None = None + + +def _load_canonical_objects() -> set[str]: + """Load canonical tokens from DB, fallback to hardcoded set.""" + global _CANONICAL_OBJECTS + if _CANONICAL_OBJECTS is not None: + return _CANONICAL_OBJECTS + try: + from db.session import get_engine + from sqlalchemy import text + engine = get_engine() + with engine.connect() as c: + rows = c.execute(text( + "SELECT canonical_token FROM compliance.object_ontology" + )).fetchall() + _CANONICAL_OBJECTS = {r[0] for r in rows} + except Exception: + _CANONICAL_OBJECTS = set() + if not _CANONICAL_OBJECTS: + _CANONICAL_OBJECTS = { + "multi_factor_auth", "password_policy", "credentials", + "session_management", "privileged_access", "access_control", + "encryption", "transport_encryption", "key_management", + "certificate_management", "network_security", + "network_segmentation", "firewall", "vpn", "remote_access", + "monitoring", "audit_logging", "siem", "alerting", + "compliance_audit", "vulnerability", "patch_management", + "backup", "disaster_recovery", "personal_data", + "sensitive_data", "consent", "data_subject_rights", + "data_retention", "data_transfer", "data_breach_notification", + "dpia", "data_processing_agreement", "privacy_by_design", + "policy", "procedure", "process", "training", "awareness", + "incident", "risk_management", "third_party_management", + "change_management", "documentation", "supervisory_authority", + "certification", "product_safety", "ai_system", "aml", + "critical_infrastructure", "medical_device", + } + return _CANONICAL_OBJECTS + + +def _validate_merge_key(merge_key: str) -> str: + """Validate merge_key object against canonical ontology. + + Returns the merge_key (possibly corrected). Logs warnings for + unknown objects so they can be tracked. + """ + parts = merge_key.split(":", 2) + if len(parts) < 2: + return merge_key + + action, obj = parts[0], parts[1] + phase = parts[2] if len(parts) > 2 else "implementation" + + # Accept OTHER: prefix (LLM signaling unknown object) + if obj.startswith("OTHER:"): + return merge_key + + # Check against canonical ontology + canonical = _load_canonical_objects() + if obj in canonical: + return merge_key + + # Try normalize_object() as fallback + from services.control_dedup import normalize_object + normed = normalize_object(obj) + if normed in canonical: + return f"{action}:{normed}:{phase}" + + # Unknown object — log and keep as-is (will be clustered by embedding) + logger.debug("merge_key unknown object: %s (normed: %s)", obj, normed) + return merge_key + + # --------------------------------------------------------------------------- # Decomposition Pass # --------------------------------------------------------------------------- @@ -3025,10 +3138,10 @@ class DecompositionPass: evidence_type=parsed.get("evidence_type", ""), provides_context=_ensure_list(parsed.get("provides_context", [])), ) - # Store merge_key from LLM output in metadata + # Store merge_key from LLM output in metadata — with validation llm_merge_key = parsed.get("merge_key", "") if llm_merge_key: - atomic.merge_group_hint = llm_merge_key + atomic.merge_group_hint = _validate_merge_key(llm_merge_key) atomic.parent_control_uuid = obl["parent_uuid"] atomic.obligation_candidate_id = obl["candidate_id"] diff --git a/control-pipeline/services/embedding_utils.py b/control-pipeline/services/embedding_utils.py new file mode 100644 index 0000000..2c9f257 --- /dev/null +++ b/control-pipeline/services/embedding_utils.py @@ -0,0 +1,84 @@ +"""Shared embedding + sub-clustering utilities for the control pipeline.""" + +import logging +import os +from collections import defaultdict + +import httpx +import numpy as np +from sklearn.cluster import MiniBatchKMeans + +logger = logging.getLogger(__name__) + +EMBEDDING_URL = os.getenv( + "EMBEDDING_SERVICE_URL", "http://embedding-service:8087" +) + + +def embed_texts(texts: list[str]) -> np.ndarray | None: + """Embed texts via the embedding-service in batches of 64.""" + try: + result = np.zeros((len(texts), 1024), dtype=np.float32) + batch_size = 64 + for i in range(0, len(texts), batch_size): + batch = texts[i : i + batch_size] + for attempt in range(3): + try: + with httpx.Client( + timeout=httpx.Timeout(60.0, connect=10.0) + ) as client: + resp = client.post( + f"{EMBEDDING_URL}/embed", json={"texts": batch} + ) + resp.raise_for_status() + embs = resp.json().get("embeddings", []) + end = min(i + len(embs), len(texts)) + result[i:end] = np.array(embs, dtype=np.float32) + break + except Exception as e: + if attempt == 2: + logger.error("Embed batch %d failed: %s", i, e) + import time + time.sleep(2) + return result + except Exception as e: + logger.error("Embedding failed: %s", e) + return None + + +def subcluster_controls( + controls: list[dict], target_size: int = 50 +) -> list[list[dict]]: + """Sub-cluster controls by embedding similarity. + + Returns a list of clusters. Falls back to naive chunking + if embedding fails. + """ + if len(controls) <= target_size: + return [controls] + + texts = [c.get("title", "") or c.get("control_id", "") for c in controls] + embeddings = embed_texts(texts) + if embeddings is None: + return [ + controls[i : i + target_size] + for i in range(0, len(controls), target_size) + ] + + norms = np.linalg.norm(embeddings, axis=1, keepdims=True) + norms[norms == 0] = 1 + normalized = embeddings / norms + + k = max(2, min(len(controls) // target_size, 30)) + kmeans = MiniBatchKMeans( + n_clusters=k, + batch_size=min(100, len(controls)), + max_iter=50, + random_state=42, + ) + labels = kmeans.fit_predict(normalized) + + clusters: dict[int, list[dict]] = defaultdict(list) + for i, ctrl in enumerate(controls): + clusters[int(labels[i])].append(ctrl) + return list(clusters.values()) diff --git a/legal-sources/osha/crawl_osha_otm.py b/legal-sources/osha/crawl_osha_otm.py new file mode 100644 index 0000000..893565c --- /dev/null +++ b/legal-sources/osha/crawl_osha_otm.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +"""Crawl OSHA Technical Manual — all chapters as HTML.""" + +import json +import logging +import time +from pathlib import Path + +from playwright.sync_api import sync_playwright + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") +logger = logging.getLogger("osha-crawl") + +OUTPUT_DIR = Path(__file__).parent / "otm_chapters" +BASE = "https://www.osha.gov" + + +def main(): + OUTPUT_DIR.mkdir(exist_ok=True) + registry = [] + + with sync_playwright() as p: + browser = p.chromium.launch(headless=False) + page = browser.new_page() + + # Step 1: Get all chapter URLs + page.goto(f"{BASE}/otm", timeout=30000) + time.sleep(5) + + links = page.query_selector_all('a[href*="/otm/"]') + chapters = [] + seen = set() + for l in links: + href = l.get_attribute("href") or "" + text = (l.inner_text() or "").strip() + if href and "chapter" in href and href not in seen and text: + seen.add(href) + chapters.append({"url": href, "title": text}) + + logger.info("Found %d chapters", len(chapters)) + + # Step 2: Download each chapter + for i, ch in enumerate(chapters): + url = ch["url"] if ch["url"].startswith("http") else BASE + ch["url"] + slug = ch["url"].replace("/otm/", "").replace("/", "_") + outfile = OUTPUT_DIR / f"{slug}.html" + + logger.info("[%d/%d] %s", i + 1, len(chapters), ch["title"][:60]) + + if outfile.exists(): + logger.info(" Already exists, skipping") + ch["local_path"] = str(outfile) + registry.append(ch) + continue + + try: + page.goto(url, timeout=30000) + time.sleep(3) + content = page.content() + outfile.write_text(content) + ch["local_path"] = str(outfile) + logger.info(" Saved: %s (%.1f KB)", outfile.name, len(content) / 1024) + except Exception as e: + logger.error(" Failed: %s", e) + ch["local_path"] = None + + registry.append(ch) + time.sleep(1) + + browser.close() + + reg_file = Path(__file__).parent / "otm_registry.json" + reg_file.write_text(json.dumps(registry, indent=2, ensure_ascii=False)) + ok = sum(1 for r in registry if r.get("local_path")) + logger.info("Done: %d/%d chapters saved", ok, len(registry)) + + +if __name__ == "__main__": + main() diff --git a/rag-service/api/__init__.py b/rag-service/api/__init__.py index 7cdcca3..5213810 100644 --- a/rag-service/api/__init__.py +++ b/rag-service/api/__init__.py @@ -3,9 +3,11 @@ from fastapi import APIRouter from api.collections import router as collections_router from api.documents import router as documents_router from api.search import router as search_router +from api.tenant_documents import router as tenant_documents_router router = APIRouter() router.include_router(collections_router, tags=["Collections"]) router.include_router(documents_router, tags=["Documents"]) +router.include_router(tenant_documents_router, tags=["Tenant Documents"]) router.include_router(search_router, tags=["Search"]) diff --git a/rag-service/api/tenant_documents.py b/rag-service/api/tenant_documents.py new file mode 100644 index 0000000..357bf90 --- /dev/null +++ b/rag-service/api/tenant_documents.py @@ -0,0 +1,289 @@ +""" +Tenant-isolated document upload, listing, and deletion. + +Each tenant gets their own Qdrant collection (bp_docs_tenant_{short_id}). +Documents are stored in MinIO under tenant-specific paths. +No data crosses tenant boundaries. + +Endpoints: + POST /api/v1/tenant/documents - Upload + process PDF + GET /api/v1/tenant/documents - List tenant's documents + DELETE /api/v1/tenant/documents/{doc_id} - Delete document + vectors + GET /api/v1/tenant/documents/{doc_id}/status - Processing status +""" + +import json +import logging +import uuid +from typing import Optional + +from fastapi import APIRouter, File, Form, HTTPException, Header, Request, UploadFile +from pydantic import BaseModel + +from api.auth import optional_jwt_auth +from embedding_client import embedding_client +from html_utils import decode_html_bytes, looks_like_html, strip_html +from minio_client_wrapper import minio_wrapper +from qdrant_client_wrapper import qdrant_wrapper + +logger = logging.getLogger("rag-service.api.tenant-documents") + +router = APIRouter(prefix="/api/v1/tenant/documents") + +VECTOR_DIM = 1024 # bge-m3 dimension +MAX_FILE_SIZE = 50 * 1024 * 1024 # 50 MB +ALLOWED_TYPES = {"application/pdf", "text/html", "text/plain"} +PDF_MAGIC = b"%PDF" + + +def _collection_name(tenant_id: str) -> str: + """Derive tenant-specific Qdrant collection name.""" + short = tenant_id.replace("-", "")[:12] + return f"bp_docs_tenant_{short}" + + +def _storage_path(tenant_id: str, document_id: str, filename: str) -> str: + """Derive tenant-isolated storage path.""" + short = tenant_id.replace("-", "")[:12] + return f"tenant_docs/{short}/{document_id}/{filename}" + + +def _extract_tenant_id( + request: Request, + x_tenant_id: Optional[str] = Header(None), +) -> str: + """Extract tenant ID from header. Required for all tenant endpoints.""" + tid = x_tenant_id or request.headers.get("x-tenant-id", "") + if not tid: + raise HTTPException(status_code=400, detail="X-Tenant-ID header required") + return tid + + +# ── Response models ──────────────────────────────────────────────── + +class DocumentResponse(BaseModel): + id: str + filename: str + file_size: int + status: str + chunk_count: int + collection: str + created_at: Optional[str] = None + + +class DocumentListResponse(BaseModel): + documents: list[DocumentResponse] + total: int + + +# ── Endpoints ────────────────────────────────────────────────────── + +@router.post("", response_model=DocumentResponse) +async def upload_tenant_document( + request: Request, + file: UploadFile = File(...), + x_tenant_id: Optional[str] = Header(None), + chunk_size: int = Form(default=512), + chunk_overlap: int = Form(default=50), + metadata_json: Optional[str] = Form(default=None), +): + """Upload a document, process it, and index in tenant-specific collection.""" + optional_jwt_auth(request) + tenant_id = _extract_tenant_id(request, x_tenant_id) + + # Read + validate + file_bytes = await file.read() + if len(file_bytes) == 0: + raise HTTPException(status_code=400, detail="Empty file") + if len(file_bytes) > MAX_FILE_SIZE: + raise HTTPException(status_code=413, detail=f"File too large (max {MAX_FILE_SIZE // 1024 // 1024} MB)") + + filename = file.filename or "document.pdf" + content_type = file.content_type or "application/octet-stream" + + # PDF magic bytes check + if filename.lower().endswith(".pdf") and not file_bytes[:4].startswith(PDF_MAGIC): + raise HTTPException(status_code=400, detail="File claims to be PDF but magic bytes don't match") + + document_id = str(uuid.uuid4()) + collection = _collection_name(tenant_id) + object_name = _storage_path(tenant_id, document_id, filename) + + # Ensure collection exists + await qdrant_wrapper.create_collection(collection, VECTOR_DIM) + + # Store in MinIO + try: + await minio_wrapper.upload_document( + object_name=object_name, + data=file_bytes, + content_type=content_type, + metadata={"document_id": document_id, "tenant_id": tenant_id}, + ) + except Exception as exc: + logger.error("MinIO upload failed for tenant %s: %s", tenant_id, exc) + raise HTTPException(status_code=500, detail="Storage failed") + + # Extract text + try: + text = await _extract_text(file_bytes, filename, content_type) + except Exception as exc: + logger.error("Text extraction failed: %s", exc) + raise HTTPException(status_code=500, detail=f"Text extraction failed: {exc}") + + if not text or not text.strip(): + raise HTTPException(status_code=400, detail="No text could be extracted") + + # Chunk + chunk_result = await embedding_client.chunk_text( + text=text, strategy="recursive", + chunk_size=chunk_size, overlap=chunk_overlap, + ) + chunks = chunk_result.chunks + chunks_meta = chunk_result.chunks_with_metadata + + if not chunks: + raise HTTPException(status_code=400, detail="Chunking produced zero chunks") + + # Embed + embeddings = await embedding_client.generate_embeddings(chunks) + + # Parse extra metadata + extra_metadata = {} + if metadata_json: + try: + extra_metadata = json.loads(metadata_json) + except json.JSONDecodeError: + pass + + # Build payloads with tenant isolation + _STRUCT_FIELDS = ("section", "section_title", "paragraph", "paragraph_num", "page") + payloads = [] + for i, chunk in enumerate(chunks): + payload = { + "document_id": document_id, + "tenant_id": tenant_id, + "filename": filename, + "chunk_index": i, + "chunk_text": chunk, + **extra_metadata, + } + if i < len(chunks_meta): + for field in _STRUCT_FIELDS: + value = chunks_meta[i].get(field) + if value is not None and value != "": + payload[field] = value + payloads.append(payload) + + # Index in tenant collection + indexed = await qdrant_wrapper.index_documents( + collection=collection, vectors=embeddings, payloads=payloads, + ) + + logger.info( + "Tenant %s: uploaded %s (%d chunks, %d vectors) to %s", + tenant_id[:8], filename, len(chunks), indexed, collection, + ) + + return DocumentResponse( + id=document_id, filename=filename, + file_size=len(file_bytes), status="indexed", + chunk_count=len(chunks), collection=collection, + ) + + +@router.get("", response_model=DocumentListResponse) +async def list_tenant_documents( + request: Request, + x_tenant_id: Optional[str] = Header(None), +): + """List all documents for this tenant.""" + optional_jwt_auth(request) + tenant_id = _extract_tenant_id(request, x_tenant_id) + + collection = _collection_name(tenant_id) + + try: + # Get unique document_ids from Qdrant + docs = await qdrant_wrapper.get_unique_documents(collection) + except Exception: + # Collection doesn't exist yet → no documents + docs = [] + + return DocumentListResponse(documents=docs, total=len(docs)) + + +@router.delete("/{doc_id}") +async def delete_tenant_document( + doc_id: str, + request: Request, + x_tenant_id: Optional[str] = Header(None), +): + """Delete a document and all its vectors from tenant collection.""" + optional_jwt_auth(request) + tenant_id = _extract_tenant_id(request, x_tenant_id) + + collection = _collection_name(tenant_id) + errors = [] + + # Delete vectors from Qdrant + try: + await qdrant_wrapper.delete_by_filter( + collection=collection, + filter_conditions={"document_id": doc_id}, + ) + except Exception as exc: + errors.append(f"Qdrant: {exc}") + + # Delete file from MinIO + try: + prefix = f"tenant_docs/{tenant_id.replace('-', '')[:12]}/{doc_id}/" + await minio_wrapper.delete_by_prefix(prefix) + except Exception as exc: + errors.append(f"MinIO: {exc}") + + if errors: + logger.warning("Partial delete for %s/%s: %s", tenant_id[:8], doc_id[:8], errors) + return {"deleted": True, "warnings": errors} + + logger.info("Tenant %s: deleted document %s", tenant_id[:8], doc_id[:8]) + return {"deleted": True, "document_id": doc_id} + + +@router.get("/{doc_id}/status") +async def document_status( + doc_id: str, + request: Request, + x_tenant_id: Optional[str] = Header(None), +): + """Get processing status for a document.""" + optional_jwt_auth(request) + tenant_id = _extract_tenant_id(request, x_tenant_id) + + collection = _collection_name(tenant_id) + try: + count = await qdrant_wrapper.count_by_filter( + collection=collection, + filter_conditions={"document_id": doc_id}, + ) + status = "indexed" if count > 0 else "not_found" + except Exception: + count = 0 + status = "not_found" + + return {"document_id": doc_id, "status": status, "chunk_count": count} + + +# ── Helpers ──────────────────────────────────────────────────────── + +async def _extract_text(file_bytes: bytes, filename: str, content_type: str) -> str: + """Extract text from PDF, HTML, or plain text.""" + if content_type == "application/pdf" or filename.lower().endswith(".pdf"): + return await embedding_client.extract_pdf(file_bytes) + if filename.lower().endswith((".html", ".htm")): + text = decode_html_bytes(file_bytes) + return strip_html(text) + text = file_bytes.decode("utf-8", errors="replace") + if looks_like_html(text): + return strip_html(text) + return text diff --git a/rag-service/minio_client_wrapper.py b/rag-service/minio_client_wrapper.py index 1aa3ad5..eaba095 100644 --- a/rag-service/minio_client_wrapper.py +++ b/rag-service/minio_client_wrapper.py @@ -122,6 +122,16 @@ class MinioClientWrapper: logger.error("Failed to delete '%s': %s", object_name, exc) raise + async def delete_by_prefix(self, prefix: str) -> int: + """Remove all objects under a prefix.""" + objects = self.client.list_objects(settings.MINIO_BUCKET, prefix=prefix, recursive=True) + count = 0 + for obj in objects: + self.client.remove_object(settings.MINIO_BUCKET, obj.object_name) + count += 1 + logger.info("Deleted %d objects with prefix '%s'", count, prefix) + return count + # ------------------------------------------------------------------ # Presigned URL # ------------------------------------------------------------------ diff --git a/rag-service/qdrant_client_wrapper.py b/rag-service/qdrant_client_wrapper.py index daeafda..22757cf 100644 --- a/rag-service/qdrant_client_wrapper.py +++ b/rag-service/qdrant_client_wrapper.py @@ -235,6 +235,74 @@ class QdrantClientWrapper: logger.info("Deleted points from '%s' with filter %s", collection, filter_conditions) return True + # ------------------------------------------------------------------ + # Tenant document helpers + # ------------------------------------------------------------------ + + async def get_unique_documents(self, collection: str) -> list[dict]: + """Get unique documents from a collection by scrolling and grouping.""" + try: + self.client.get_collection(collection) + except Exception: + return [] + + docs: dict[str, dict] = {} + offset = None + while True: + result = self.client.scroll( + collection_name=collection, + scroll_filter=None, + limit=100, + offset=offset, + with_payload=True, + with_vectors=False, + ) + points, next_offset = result + for pt in points: + payload = pt.payload or {} + doc_id = payload.get("document_id", "") + if doc_id and doc_id not in docs: + docs[doc_id] = { + "id": doc_id, + "filename": payload.get("filename", ""), + "file_size": payload.get("file_size", 0), + "status": "indexed", + "chunk_count": 0, + "collection": collection, + } + if doc_id: + docs[doc_id]["chunk_count"] += 1 + + if next_offset is None: + break + offset = next_offset + + return list(docs.values()) + + async def count_by_filter( + self, collection: str, filter_conditions: dict[str, Any] + ) -> int: + """Count points matching filter.""" + try: + self.client.get_collection(collection) + except Exception: + return 0 + + must_conditions = [] + for key, value in filter_conditions.items(): + must_conditions.append( + qmodels.FieldCondition( + key=key, match=qmodels.MatchValue(value=value) + ) + ) + + result = self.client.count( + collection_name=collection, + count_filter=qmodels.Filter(must=must_conditions), + exact=True, + ) + return result.count + # ------------------------------------------------------------------ # Info # ------------------------------------------------------------------