diff --git a/.gitea/workflows/ci.yaml b/.gitea/workflows/ci.yaml index a92bb2c..9cc229d 100644 --- a/.gitea/workflows/ci.yaml +++ b/.gitea/workflows/ci.yaml @@ -169,6 +169,22 @@ jobs: pip install --quiet --no-cache-dir pytest pytest-asyncio python -m pytest test_main.py -v --tb=short + # ======================================== + # Validate Canonical Controls + # ======================================== + + validate-canonical-controls: + runs-on: docker + container: python:3.12-slim + steps: + - name: Checkout + run: | + apt-get update -qq && apt-get install -y -qq git > /dev/null 2>&1 + git clone --depth 1 --branch ${GITHUB_REF_NAME} ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY}.git . + - name: Validate controls + run: | + python scripts/validate-controls.py + # ======================================== # Build & Deploy auf Hetzner (nur main, kein PR) # ======================================== @@ -181,6 +197,7 @@ jobs: - test-python-backend-compliance - test-python-document-crawler - test-python-dsms-gateway + - validate-canonical-controls container: docker:27-cli steps: - name: Deploy diff --git a/admin-compliance/app/api/sdk/v1/canonical/route.ts b/admin-compliance/app/api/sdk/v1/canonical/route.ts new file mode 100644 index 0000000..9ef50c2 --- /dev/null +++ b/admin-compliance/app/api/sdk/v1/canonical/route.ts @@ -0,0 +1,123 @@ +import { NextRequest, NextResponse } from 'next/server' + +const BACKEND_URL = process.env.BACKEND_URL || 'http://backend-compliance:8002' + +/** + * Proxy: GET /api/sdk/v1/canonical?endpoint=... + * + * Routes to backend canonical control endpoints: + * endpoint=frameworks → GET /api/v1/canonical/frameworks + * endpoint=controls → GET /api/v1/canonical/controls(?severity=...&domain=...) + * endpoint=control&id= → GET /api/v1/canonical/controls/{id} + * endpoint=sources → GET /api/v1/canonical/sources + * endpoint=licenses → GET /api/v1/canonical/licenses + */ +export async function GET(request: NextRequest) { + try { + const { searchParams } = new URL(request.url) + const endpoint = searchParams.get('endpoint') || 'frameworks' + + let backendPath: string + + switch (endpoint) { + case 'frameworks': + backendPath = '/api/v1/canonical/frameworks' + break + + case 'controls': { + const severity = searchParams.get('severity') + const domain = searchParams.get('domain') + const params = new URLSearchParams() + if (severity) params.set('severity', severity) + if (domain) params.set('domain', domain) + const qs = params.toString() + backendPath = `/api/v1/canonical/controls${qs ? `?${qs}` : ''}` + break + } + + case 'control': { + const controlId = searchParams.get('id') + if (!controlId) { + return NextResponse.json({ error: 'Missing control id' }, { status: 400 }) + } + backendPath = `/api/v1/canonical/controls/${encodeURIComponent(controlId)}` + break + } + + case 'sources': + backendPath = '/api/v1/canonical/sources' + break + + case 'licenses': + backendPath = '/api/v1/canonical/licenses' + break + + default: + return NextResponse.json({ error: `Unknown endpoint: ${endpoint}` }, { status: 400 }) + } + + const response = await fetch(`${BACKEND_URL}${backendPath}`) + + if (!response.ok) { + if (response.status === 404) { + return NextResponse.json(null, { status: 404 }) + } + const errorText = await response.text() + return NextResponse.json( + { error: 'Backend error', details: errorText }, + { status: response.status } + ) + } + + return NextResponse.json(await response.json()) + } catch (error) { + console.error('Canonical control proxy error:', error) + return NextResponse.json( + { error: 'Failed to connect to backend' }, + { status: 503 } + ) + } +} + +/** + * Proxy: POST /api/sdk/v1/canonical?endpoint=similarity-check&id=... + * + * Routes to: POST /api/v1/canonical/controls/{id}/similarity-check + */ +export async function POST(request: NextRequest) { + try { + const { searchParams } = new URL(request.url) + const endpoint = searchParams.get('endpoint') + const controlId = searchParams.get('id') + + if (endpoint !== 'similarity-check' || !controlId) { + return NextResponse.json({ error: 'Invalid endpoint or missing id' }, { status: 400 }) + } + + const body = await request.json() + const response = await fetch( + `${BACKEND_URL}/api/v1/canonical/controls/${encodeURIComponent(controlId)}/similarity-check`, + { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(body), + } + ) + + if (!response.ok) { + const errorText = await response.text() + return NextResponse.json( + { error: 'Backend error', details: errorText }, + { status: response.status } + ) + } + + return NextResponse.json(await response.json()) + } catch (error) { + console.error('Similarity check proxy error:', error) + return NextResponse.json( + { error: 'Failed to connect to backend' }, + { status: 503 } + ) + } +} diff --git a/admin-compliance/app/sdk/control-library/page.tsx b/admin-compliance/app/sdk/control-library/page.tsx new file mode 100644 index 0000000..656744f --- /dev/null +++ b/admin-compliance/app/sdk/control-library/page.tsx @@ -0,0 +1,484 @@ +'use client' + +import { useState, useEffect, useMemo, useCallback } from 'react' +import { + Shield, Search, ChevronRight, ArrowLeft, ExternalLink, + Filter, AlertTriangle, CheckCircle2, Info, Lock, + FileText, BookOpen, Scale, +} from 'lucide-react' + +// ============================================================================= +// TYPES +// ============================================================================= + +interface OpenAnchor { + framework: string + ref: string + url: string +} + +interface EvidenceItem { + type: string + description: string +} + +interface CanonicalControl { + id: string + framework_id: string + control_id: string + title: string + objective: string + rationale: string + scope: { + platforms?: string[] + components?: string[] + data_classes?: string[] + } + requirements: string[] + test_procedure: string[] + evidence: EvidenceItem[] + severity: string + risk_score: number | null + implementation_effort: string | null + evidence_confidence: number | null + open_anchors: OpenAnchor[] + release_state: string + tags: string[] + created_at: string + updated_at: string +} + +interface Framework { + id: string + framework_id: string + name: string + version: string + description: string + release_state: string +} + +// ============================================================================= +// CONSTANTS +// ============================================================================= + +const SEVERITY_CONFIG: Record }> = { + critical: { bg: 'bg-red-100 text-red-800', label: 'Kritisch', icon: AlertTriangle }, + high: { bg: 'bg-orange-100 text-orange-800', label: 'Hoch', icon: AlertTriangle }, + medium: { bg: 'bg-yellow-100 text-yellow-800', label: 'Mittel', icon: Info }, + low: { bg: 'bg-green-100 text-green-800', label: 'Niedrig', icon: CheckCircle2 }, +} + +const EFFORT_LABELS: Record = { + s: 'Klein (S)', + m: 'Mittel (M)', + l: 'Gross (L)', + xl: 'Sehr gross (XL)', +} + +const BACKEND_URL = '/api/sdk/v1/canonical' + +// ============================================================================= +// HELPERS +// ============================================================================= + +function SeverityBadge({ severity }: { severity: string }) { + const config = SEVERITY_CONFIG[severity] || SEVERITY_CONFIG.medium + const Icon = config.icon + return ( + + + {config.label} + + ) +} + +function StateBadge({ state }: { state: string }) { + const config: Record = { + draft: 'bg-gray-100 text-gray-600', + review: 'bg-blue-100 text-blue-700', + approved: 'bg-green-100 text-green-700', + deprecated: 'bg-red-100 text-red-600', + } + return ( + + {state} + + ) +} + +function getDomain(controlId: string): string { + return controlId.split('-')[0] || '' +} + +// ============================================================================= +// CONTROL LIBRARY PAGE +// ============================================================================= + +export default function ControlLibraryPage() { + const [frameworks, setFrameworks] = useState([]) + const [controls, setControls] = useState([]) + const [selectedControl, setSelectedControl] = useState(null) + const [loading, setLoading] = useState(true) + const [error, setError] = useState(null) + + // Filters + const [searchQuery, setSearchQuery] = useState('') + const [severityFilter, setSeverityFilter] = useState('') + const [domainFilter, setDomainFilter] = useState('') + + // Load data + useEffect(() => { + async function load() { + try { + const [fwRes, ctrlRes] = await Promise.all([ + fetch(`${BACKEND_URL}?endpoint=frameworks`), + fetch(`${BACKEND_URL}?endpoint=controls`), + ]) + + if (fwRes.ok) { + setFrameworks(await fwRes.json()) + } + if (ctrlRes.ok) { + setControls(await ctrlRes.json()) + } + } catch (err) { + setError(err instanceof Error ? err.message : 'Fehler beim Laden') + } finally { + setLoading(false) + } + } + load() + }, []) + + // Derived: unique domains + const domains = useMemo(() => { + const set = new Set(controls.map(c => getDomain(c.control_id))) + return Array.from(set).sort() + }, [controls]) + + // Filtered controls + const filteredControls = useMemo(() => { + return controls.filter(c => { + if (severityFilter && c.severity !== severityFilter) return false + if (domainFilter && getDomain(c.control_id) !== domainFilter) return false + if (searchQuery) { + const q = searchQuery.toLowerCase() + return ( + c.control_id.toLowerCase().includes(q) || + c.title.toLowerCase().includes(q) || + c.objective.toLowerCase().includes(q) || + c.tags.some(t => t.toLowerCase().includes(q)) + ) + } + return true + }) + }, [controls, severityFilter, domainFilter, searchQuery]) + + const handleBack = useCallback(() => setSelectedControl(null), []) + + if (loading) { + return ( +
+
+
+ ) + } + + if (error) { + return ( +
+
{error}
+
+ ) + } + + // ========================================================================= + // DETAIL VIEW + // ========================================================================= + + if (selectedControl) { + const ctrl = selectedControl + return ( +
+ + + {/* Header */} +
+
+ +
+
+
+ {ctrl.control_id} + + +
+

{ctrl.title}

+
+ {ctrl.risk_score !== null && Risiko-Score: {ctrl.risk_score}/10} + {ctrl.implementation_effort && Aufwand: {EFFORT_LABELS[ctrl.implementation_effort] || ctrl.implementation_effort}} +
+
+
+ + {/* Objective & Rationale */} +
+
+

Ziel

+

{ctrl.objective}

+
+ +
+

Begruendung

+

{ctrl.rationale}

+
+ + {/* Scope */} +
+

Geltungsbereich

+
+ {ctrl.scope.platforms && ctrl.scope.platforms.length > 0 && ( +
+

Plattformen

+
+ {ctrl.scope.platforms.map(p => ( + {p} + ))} +
+
+ )} + {ctrl.scope.components && ctrl.scope.components.length > 0 && ( +
+

Komponenten

+
+ {ctrl.scope.components.map(c => ( + {c} + ))} +
+
+ )} + {ctrl.scope.data_classes && ctrl.scope.data_classes.length > 0 && ( +
+

Datenklassen

+
+ {ctrl.scope.data_classes.map(d => ( + {d} + ))} +
+
+ )} +
+
+ + {/* Requirements */} +
+

Anforderungen

+
    + {ctrl.requirements.map((req, i) => ( +
  1. + {i + 1} + {req} +
  2. + ))} +
+
+ + {/* Test Procedure */} +
+

Pruefverfahren

+
    + {ctrl.test_procedure.map((step, i) => ( +
  1. + + {step} +
  2. + ))} +
+
+ + {/* Evidence */} +
+

Nachweisanforderungen

+
+ {ctrl.evidence.map((ev, i) => ( +
+ +
+ {ev.type} +

{ev.description}

+
+
+ ))} +
+
+ + {/* Open Anchors — THE KEY SECTION */} +
+
+ +

Open-Source-Referenzen

+ ({ctrl.open_anchors.length} Quellen) +
+

+ Dieses Control basiert auf frei verfuegbarem Wissen. Alle Referenzen sind offen und oeffentlich zugaenglich. +

+
+ {ctrl.open_anchors.map((anchor, i) => ( +
+ +
+ {anchor.framework} +

{anchor.ref}

+
+ + + Quelle + +
+ ))} +
+
+ + {/* Tags */} + {ctrl.tags.length > 0 && ( +
+

Tags

+
+ {ctrl.tags.map(tag => ( + {tag} + ))} +
+
+ )} +
+
+ ) + } + + // ========================================================================= + // LIST VIEW + // ========================================================================= + + return ( +
+ {/* Header */} +
+
+
+ +
+

Canonical Control Library

+

+ {controls.length} unabhaengig formulierte Security Controls —{' '} + {controls.reduce((sum, c) => sum + c.open_anchors.length, 0)} Open-Source-Referenzen +

+
+
+
+ + {/* Frameworks */} + {frameworks.length > 0 && ( +
+
+ + {frameworks[0]?.name} v{frameworks[0]?.version} + + {frameworks[0]?.description} +
+
+ )} + + {/* Filters */} +
+
+ + setSearchQuery(e.target.value)} + className="w-full pl-9 pr-4 py-2 text-sm border border-gray-300 rounded-lg focus:outline-none focus:ring-2 focus:ring-purple-500" + /> +
+
+ +
+ + +
+
+ + {/* Control List */} +
+
+ {filteredControls.map(ctrl => ( + + ))} + + {filteredControls.length === 0 && ( +
+ Keine Controls gefunden. +
+ )} +
+
+
+ ) +} diff --git a/admin-compliance/app/sdk/control-provenance/page.tsx b/admin-compliance/app/sdk/control-provenance/page.tsx new file mode 100644 index 0000000..df049c9 --- /dev/null +++ b/admin-compliance/app/sdk/control-provenance/page.tsx @@ -0,0 +1,496 @@ +'use client' + +import { useState, useEffect } from 'react' +import { + Shield, BookOpen, ExternalLink, CheckCircle2, AlertTriangle, + Lock, Scale, FileText, Eye, ArrowLeft, +} from 'lucide-react' +import Link from 'next/link' + +// ============================================================================= +// TYPES +// ============================================================================= + +interface LicenseInfo { + license_id: string + name: string + terms_url: string | null + commercial_use: string + ai_training_restriction: string | null + tdm_allowed_under_44b: string | null + deletion_required: boolean + notes: string | null +} + +interface SourceInfo { + source_id: string + title: string + publisher: string + url: string | null + version_label: string | null + language: string + license_id: string + license_name: string + commercial_use: string + allowed_analysis: boolean + allowed_store_excerpt: boolean + allowed_ship_embeddings: boolean + allowed_ship_in_product: boolean + vault_retention_days: number + vault_access_tier: string +} + +// ============================================================================= +// STATIC PROVENANCE DOCUMENTATION +// ============================================================================= + +const PROVENANCE_SECTIONS = [ + { + id: 'methodology', + title: 'Methodik der Control-Erstellung', + content: `## Unabhaengige Formulierung + +Alle Controls in der Canonical Control Library wurden **eigenstaendig formuliert** und folgen einer +**unabhaengigen Taxonomie**. Es werden keine proprietaeren Bezeichner, Nummern oder Strukturen +aus geschuetzten Quellen uebernommen. + +### Dreistufiger Prozess + +1. **Offene Recherche** — Identifikation von Security-Anforderungen aus oeffentlichen, frei zugaenglichen + Frameworks (OWASP, NIST, ENISA). Jede Anforderung wird aus mindestens 2 unabhaengigen offenen Quellen belegt. + +2. **Eigenstaendige Formulierung** — Jedes Control wird mit eigener Sprache, eigener Struktur und eigener + Taxonomie (z.B. AUTH-001, NET-001) verfasst. Kein Copy-Paste, keine Paraphrase geschuetzter Texte. + +3. **Too-Close-Pruefung** — Automatisierte Aehnlichkeitspruefung gegen Quelltexte mit 5 Metriken + (Token Overlap, N-Gram Jaccard, Embedding Cosine, LCS Ratio, Exact-Phrase). Nur Controls mit + Status PASS oder WARN (+ Human Review) werden freigegeben. + +### Rechtliche Grundlage + +- **UrhG §44b** — Text & Data Mining erlaubt fuer Analyse; Kopien werden danach geloescht +- **UrhG §23** — Hinreichender Abstand zum Originalwerk durch eigene Formulierung +- **BSI Nutzungsbedingungen** — Kommerzielle Nutzung nur mit Zustimmung; wir nutzen BSI-Dokumente + ausschliesslich als Analysegrundlage, nicht im Produkt`, + }, + { + id: 'taxonomy', + title: 'Unabhaengige Taxonomie', + content: `## Eigenes Klassifikationssystem + +Die Canonical Control Library verwendet ein **eigenes Domain-Schema**, das sich bewusst von +proprietaeren Frameworks unterscheidet: + +| Domain | Name | Abgrenzung | +|--------|------|------------| +| AUTH | Identity & Access Management | Eigene Struktur, nicht BSI O.Auth_* | +| NET | Network & Transport Security | Eigene Struktur, nicht BSI O.Netz_* | +| SUP | Software Supply Chain | NIST SSDF / SLSA-basiert | +| LOG | Security Operations & Logging | OWASP Logging Best Practices | +| WEB | Web Application Security | OWASP ASVS-basiert | +| DATA | Data Governance & Classification | NIST SP 800-60 basiert | +| CRYP | Cryptographic Operations | NIST SP 800-57 basiert | +| REL | Release & Change Governance | OWASP SAMM basiert | + +### ID-Format + +Control-IDs folgen dem Muster \`DOMAIN-NNN\` (z.B. AUTH-001, NET-002). Dieses Format ist +**nicht von BSI oder anderen proprietaeren Standards abgeleitet**, sondern folgt einem +allgemein ueblichen Nummerierungsschema.`, + }, + { + id: 'open-sources', + title: 'Offene Referenzquellen', + content: `## Primaere offene Quellen + +Alle Controls sind in mindestens einer der folgenden **frei zugaenglichen** Quellen verankert: + +### OWASP (CC BY-SA 4.0 — kommerziell erlaubt) +- **ASVS** — Application Security Verification Standard v4.0.3 +- **MASVS** — Mobile Application Security Verification Standard v2.1 +- **Top 10** — OWASP Top 10 (2021) +- **Cheat Sheets** — OWASP Cheat Sheet Series +- **SAMM** — Software Assurance Maturity Model + +### NIST (Public Domain — keine Einschraenkungen) +- **SP 800-53 Rev.5** — Security and Privacy Controls +- **SP 800-63B** — Digital Identity Guidelines (Authentication) +- **SP 800-57** — Key Management Recommendations +- **SP 800-52 Rev.2** — TLS Implementation Guidelines +- **SP 800-92** — Log Management Guide +- **SP 800-218 (SSDF)** — Secure Software Development Framework +- **SP 800-60** — Information Types to Security Categories + +### ENISA (CC BY 4.0 — kommerziell erlaubt) +- Good Practices for IoT/Mobile Security +- Data Protection Engineering +- Algorithms, Key Sizes and Parameters Report + +### Weitere offene Quellen +- **SLSA** (Supply-chain Levels for Software Artifacts) — Google Open Source +- **CIS Controls v8** (CC BY-NC-ND — nur fuer interne Analyse)`, + }, + { + id: 'restricted-sources', + title: 'Geschuetzte Quellen — Nur interne Analyse', + content: `## Quellen mit eingeschraenkter Nutzung + +Die folgenden Quellen werden **ausschliesslich intern zur Analyse** verwendet. +Kein Text, keine Struktur, keine Bezeichner aus diesen Quellen erscheinen im Produkt. + +### BSI (Nutzungsbedingungen — kommerziell eingeschraenkt) +- TR-03161 Teil 1-3 (Mobile, Web, Hintergrunddienste) +- Nutzung: TDM unter UrhG §44b, Kopien werden geloescht +- Kein Shipping von Zitaten, Embeddings oder Strukturen + +### ISO/IEC (Kostenpflichtig — kein Shipping) +- ISO 27001, ISO 27002 +- Nutzung: Nur als Referenz fuer Mapping, kein Text im Produkt + +### ETSI (Restriktiv — kein kommerzieller Gebrauch) +- Nutzung: Nur als Hintergrundwissen, kein direkter Einfluss + +### Trennungsprinzip + +| Ebene | Geschuetzte Quelle | Offene Quelle | +|-------|--------------------|---------------| +| Analyse | ✅ Darf gelesen werden | ✅ Darf gelesen werden | +| Inspiration | ✅ Darf Ideen liefern | ✅ Darf Ideen liefern | +| Formulierung | ❌ Keine Uebernahme | ✅ Darf zitiert werden | +| Struktur | ❌ Keine Uebernahme | ✅ Darf verwendet werden | +| Produkttext | ❌ Nicht erlaubt | ✅ Erlaubt |`, + }, + { + id: 'validation', + title: 'Automatisierte Validierung', + content: `## CI/CD-Pruefungen + +Jedes Control wird bei jedem Commit automatisch geprueft: + +### 1. Schema-Validierung +- Alle Pflichtfelder vorhanden +- Control-ID Format: \`^[A-Z]{2,6}-[0-9]{3}$\` +- Severity: low, medium, high, critical +- Risk Score: 0-10 + +### 2. No-Leak Scanner +Regex-Pruefung gegen verbotene Muster in produktfaehigen Feldern: +- \`O.[A-Za-z]+_[0-9]+\` — BSI Objective-IDs +- \`TR-03161\` — Direkte BSI-TR-Referenzen +- \`BSI-TR-\` — BSI-spezifische Locators +- \`Anforderung [A-Z].[0-9]+\` — BSI-Anforderungsformat + +### 3. Open Anchor Check +Jedes freigegebene Control muss mindestens 1 Open-Source-Referenz haben. + +### 4. Too-Close Detektor (5 Metriken) + +| Metrik | Warn | Fail | Beschreibung | +|--------|------|------|-------------| +| Exact Phrase | ≥8 Tokens | ≥12 Tokens | Laengste identische Token-Sequenz | +| Token Overlap | ≥0.20 | ≥0.30 | Jaccard-Aehnlichkeit der Token-Mengen | +| 3-Gram Jaccard | ≥0.10 | ≥0.18 | Zeichenketten-Aehnlichkeit | +| Embedding Cosine | ≥0.86 | ≥0.92 | Semantische Aehnlichkeit (bge-m3) | +| LCS Ratio | ≥0.35 | ≥0.50 | Longest Common Subsequence | + +**Entscheidungslogik:** +- **PASS** — Kein Fail + max 1 Warn +- **WARN** — Max 2 Warn, kein Fail → Human Review erforderlich +- **FAIL** — Irgendein Fail → Blockiert, Umformulierung noetig`, + }, +] + +// ============================================================================= +// PAGE +// ============================================================================= + +export default function ControlProvenancePage() { + const [licenses, setLicenses] = useState([]) + const [sources, setSources] = useState([]) + const [activeSection, setActiveSection] = useState('methodology') + const [loading, setLoading] = useState(true) + + useEffect(() => { + async function load() { + try { + const [licRes, srcRes] = await Promise.all([ + fetch('/api/sdk/v1/canonical?endpoint=licenses'), + fetch('/api/sdk/v1/canonical?endpoint=sources'), + ]) + if (licRes.ok) setLicenses(await licRes.json()) + if (srcRes.ok) setSources(await srcRes.json()) + } catch { + // silently continue — static content still shown + } finally { + setLoading(false) + } + } + load() + }, []) + + const currentSection = PROVENANCE_SECTIONS.find(s => s.id === activeSection) + + return ( +
+ {/* Header */} +
+
+ +
+

Control Provenance Wiki

+

+ Dokumentation der unabhaengigen Herkunft aller Security Controls — rechtssicherer Nachweis +

+
+ + + Zur Control Library + +
+
+ +
+ {/* Left: Navigation */} +
+
+

Dokumentation

+ {PROVENANCE_SECTIONS.map(section => ( + + ))} + +
+

Live-Daten

+ + +
+
+
+ + {/* Right: Content */} +
+
+ {/* Static documentation sections */} + {currentSection && ( +
+

{currentSection.title}

+
+ +
+
+ )} + + {/* License Matrix (live data) */} + {activeSection === 'license-matrix' && ( +
+

Lizenz-Matrix

+

+ Uebersicht aller Lizenzen mit ihren erlaubten Nutzungsarten. +

+ {loading ? ( +
+ ) : ( +
+ + + + + + + + + + + + {licenses.map(lic => ( + + + + + + + + ))} + +
LizenzKommerziellAI-TrainingTDM (§44b)Loeschpflicht
+
{lic.license_id}
+
{lic.name}
+
+ + + + + + + {lic.deletion_required ? ( + Ja + ) : ( + Nein + )} +
+
+ )} +
+ )} + + {/* Source Registry (live data) */} + {activeSection === 'source-registry' && ( +
+

Quellenregister

+

+ Alle registrierten Quellen mit ihren Berechtigungen. +

+ {loading ? ( +
+ ) : ( +
+ {sources.map(src => ( +
+
+
+

{src.title}

+

{src.publisher} — {src.license_name}

+
+ {src.url && ( + + + Quelle + + )} +
+
+ + + + +
+
+ ))} +
+ )} +
+ )} +
+
+
+
+ ) +} + +// ============================================================================= +// HELPER COMPONENTS +// ============================================================================= + +function UsageBadge({ value }: { value: string }) { + const config: Record = { + allowed: { bg: 'bg-green-100 text-green-800', label: 'Erlaubt' }, + restricted: { bg: 'bg-yellow-100 text-yellow-800', label: 'Eingeschraenkt' }, + prohibited: { bg: 'bg-red-100 text-red-800', label: 'Verboten' }, + unclear: { bg: 'bg-gray-100 text-gray-600', label: 'Unklar' }, + yes: { bg: 'bg-green-100 text-green-800', label: 'Ja' }, + no: { bg: 'bg-red-100 text-red-800', label: 'Nein' }, + 'n/a': { bg: 'bg-gray-100 text-gray-400', label: 'k.A.' }, + } + const c = config[value] || config.unclear + return {c.label} +} + +function PermBadge({ label, allowed }: { label: string; allowed: boolean }) { + return ( +
+ {allowed ? ( + + ) : ( + + )} + {label} +
+ ) +} + +function MarkdownRenderer({ content }: { content: string }) { + let html = content + .replace(/&/g, '&') + .replace(//g, '>') + + // Code blocks + html = html.replace( + /^```[\w]*\n([\s\S]*?)^```$/gm, + (_m, code: string) => `
${code.trimEnd()}
` + ) + + // Tables + html = html.replace( + /^(\|.+\|)\n(\|[\s:|-]+\|)\n((?:\|.+\|\n?)*)/gm, + (_m, header: string, _sep: string, body: string) => { + const ths = header.split('|').filter((c: string) => c.trim()).map((c: string) => + `${c.trim()}` + ).join('') + const rows = body.trim().split('\n').map((row: string) => { + const tds = row.split('|').filter((c: string) => c.trim()).map((c: string) => + `${c.trim()}` + ).join('') + return `${tds}` + }).join('') + return `${ths}${rows}
` + } + ) + + // Headers + html = html.replace(/^### (.+)$/gm, '

$1

') + html = html.replace(/^## (.+)$/gm, '

$1

') + + // Bold + html = html.replace(/\*\*(.+?)\*\*/g, '$1') + + // Inline code + html = html.replace(/`([^`]+)`/g, '$1') + + // Lists + html = html.replace(/^- (.+)$/gm, '
  • $1
  • ') + html = html.replace(/((?:]*>.*<\/li>\n?)+)/g, '
      $1
    ') + + // Numbered lists + html = html.replace(/^(\d+)\. (.+)$/gm, '
  • $2
  • ') + + // Paragraphs + html = html.replace(/^(?!<[hultdp]|$)(.+)$/gm, '

    $1

    ') + + return
    +} diff --git a/admin-compliance/lib/sdk/types.ts b/admin-compliance/lib/sdk/types.ts index d0894bc..a7f789a 100644 --- a/admin-compliance/lib/sdk/types.ts +++ b/admin-compliance/lib/sdk/types.ts @@ -906,6 +906,34 @@ export const SDK_STEPS: SDKStep[] = [ prerequisiteSteps: ['academy'], isOptional: false, }, + { + id: 'control-library', + seq: 4900, + phase: 2, + package: 'betrieb', + order: 11, + name: 'Control Library', + nameShort: 'Controls', + description: 'Canonical Security Controls mit Open-Source-Referenzen', + url: '/sdk/control-library', + checkpointId: 'CP-CLIB', + prerequisiteSteps: [], + isOptional: true, + }, + { + id: 'control-provenance', + seq: 4950, + phase: 2, + package: 'betrieb', + order: 12, + name: 'Control Provenance', + nameShort: 'Provenance', + description: 'Herkunftsnachweis: Offene Quellen, Lizenzen, Too-Close-Pruefung', + url: '/sdk/control-provenance', + checkpointId: 'CP-CPROV', + prerequisiteSteps: [], + isOptional: true, + }, ] // ============================================================================= diff --git a/ai-compliance-sdk/internal/ucca/canonical_control_loader.go b/ai-compliance-sdk/internal/ucca/canonical_control_loader.go new file mode 100644 index 0000000..d959717 --- /dev/null +++ b/ai-compliance-sdk/internal/ucca/canonical_control_loader.go @@ -0,0 +1,182 @@ +package ucca + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "runtime" + "strings" +) + +// CanonicalControl represents a single independently authored security control. +type CanonicalControl struct { + ControlID string `json:"control_id"` + Title string `json:"title"` + Domain string `json:"domain"` + Severity string `json:"severity"` // low, medium, high, critical + RiskScore float64 `json:"risk_score"` + ImplementationEffort string `json:"implementation_effort"` // s, m, l, xl + Objective string `json:"objective"` + Rationale string `json:"rationale"` + Scope CanonicalScope `json:"scope"` + Requirements []string `json:"requirements"` + TestProcedure []string `json:"test_procedure"` + Evidence []CanonicalEvidence `json:"evidence"` + OpenAnchors []OpenAnchor `json:"open_anchors"` + Tags []string `json:"tags"` +} + +// CanonicalScope defines where a control applies. +type CanonicalScope struct { + Platforms []string `json:"platforms"` + Components []string `json:"components"` + DataClasses []string `json:"data_classes"` +} + +// CanonicalEvidence describes a required evidence item. +type CanonicalEvidence struct { + Type string `json:"type"` + Description string `json:"description"` +} + +// OpenAnchor links a control to an open-source framework reference. +type OpenAnchor struct { + Framework string `json:"framework"` + Ref string `json:"ref"` + URL string `json:"url"` +} + +// CanonicalDomain groups controls by security domain. +type CanonicalDomain struct { + ID string `json:"id"` + Name string `json:"name"` + Objective string `json:"objective"` +} + +// CanonicalFramework is the framework metadata. +type CanonicalFramework struct { + ID string `json:"id"` + Name string `json:"name"` + Version string `json:"version"` + Description string `json:"description"` +} + +// CanonicalControlLibrary is the top-level JSON structure. +type CanonicalControlLibrary struct { + Version string `json:"version"` + Schema string `json:"schema"` + Generated string `json:"generated"` + Framework CanonicalFramework `json:"framework"` + TotalControls int `json:"total_controls"` + Domains []CanonicalDomain `json:"domains"` + Controls []CanonicalControl `json:"controls"` +} + +// CanonicalControlIndex provides fast lookup of canonical controls. +type CanonicalControlIndex struct { + ByID map[string]*CanonicalControl + ByDomain map[string][]*CanonicalControl + BySeverity map[string][]*CanonicalControl + ByFramework map[string][]*CanonicalControl // framework ref -> controls + Domains []CanonicalDomain + Framework CanonicalFramework + AllControls []*CanonicalControl +} + +// LoadCanonicalControls loads the canonical control library from JSON. +func LoadCanonicalControls() (*CanonicalControlIndex, error) { + data, err := readCanonicalControlsFile() + if err != nil { + return nil, err + } + + var library CanonicalControlLibrary + if err := json.Unmarshal(data, &library); err != nil { + return nil, fmt.Errorf("failed to parse canonical controls: %w", err) + } + + return buildCanonicalIndex(&library), nil +} + +func readCanonicalControlsFile() ([]byte, error) { + candidates := []string{ + "policies/canonical_controls_v1.json", + "../policies/canonical_controls_v1.json", + "../../policies/canonical_controls_v1.json", + } + + _, filename, _, ok := runtime.Caller(0) + if ok { + srcDir := filepath.Dir(filename) + candidates = append(candidates, + filepath.Join(srcDir, "../../policies/canonical_controls_v1.json"), + ) + } + + for _, p := range candidates { + abs, err := filepath.Abs(p) + if err != nil { + continue + } + data, err := os.ReadFile(abs) + if err == nil { + return data, nil + } + } + + return nil, fmt.Errorf("canonical_controls_v1.json not found in any candidate path") +} + +func buildCanonicalIndex(library *CanonicalControlLibrary) *CanonicalControlIndex { + idx := &CanonicalControlIndex{ + ByID: make(map[string]*CanonicalControl), + ByDomain: make(map[string][]*CanonicalControl), + BySeverity: make(map[string][]*CanonicalControl), + ByFramework: make(map[string][]*CanonicalControl), + Domains: library.Domains, + Framework: library.Framework, + } + + for i := range library.Controls { + ctrl := &library.Controls[i] + + idx.ByID[ctrl.ControlID] = ctrl + idx.ByDomain[ctrl.Domain] = append(idx.ByDomain[ctrl.Domain], ctrl) + idx.BySeverity[ctrl.Severity] = append(idx.BySeverity[ctrl.Severity], ctrl) + idx.AllControls = append(idx.AllControls, ctrl) + + for _, anchor := range ctrl.OpenAnchors { + idx.ByFramework[anchor.Framework] = append(idx.ByFramework[anchor.Framework], ctrl) + } + } + + return idx +} + +// GetControl returns a control by its ID (e.g. "AUTH-001"). +func (idx *CanonicalControlIndex) GetControl(id string) (*CanonicalControl, bool) { + ctrl, ok := idx.ByID[strings.ToUpper(id)] + return ctrl, ok +} + +// GetControlsByDomain returns all controls for a domain (e.g. "AUTH"). +func (idx *CanonicalControlIndex) GetControlsByDomain(domain string) []*CanonicalControl { + return idx.ByDomain[strings.ToUpper(domain)] +} + +// GetControlsBySeverity returns all controls with a given severity. +func (idx *CanonicalControlIndex) GetControlsBySeverity(severity string) []*CanonicalControl { + return idx.BySeverity[strings.ToLower(severity)] +} + +// GetControlsByFramework returns all controls anchored to a framework (e.g. "OWASP ASVS"). +func (idx *CanonicalControlIndex) GetControlsByFramework(framework string) []*CanonicalControl { + return idx.ByFramework[framework] +} + +// ValidateControlID checks if a control ID exists. +func (idx *CanonicalControlIndex) ValidateControlID(id string) bool { + _, ok := idx.ByID[strings.ToUpper(id)] + return ok +} diff --git a/ai-compliance-sdk/internal/ucca/canonical_control_loader_test.go b/ai-compliance-sdk/internal/ucca/canonical_control_loader_test.go new file mode 100644 index 0000000..0439e9b --- /dev/null +++ b/ai-compliance-sdk/internal/ucca/canonical_control_loader_test.go @@ -0,0 +1,154 @@ +package ucca + +import ( + "testing" +) + +func TestLoadCanonicalControls_ValidFile(t *testing.T) { + idx, err := LoadCanonicalControls() + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + if idx == nil { + t.Fatal("Expected non-nil index") + } + if len(idx.AllControls) != 10 { + t.Errorf("Expected 10 controls, got %d", len(idx.AllControls)) + } +} + +func TestCanonicalControlIndex_GetControl(t *testing.T) { + idx, err := LoadCanonicalControls() + if err != nil { + t.Fatalf("Failed to load controls: %v", err) + } + + tests := []struct { + name string + id string + expected bool + }{ + {"existing control AUTH-001", "AUTH-001", true}, + {"existing control NET-001", "NET-001", true}, + {"lowercase lookup", "auth-001", true}, + {"non-existing control", "FAKE-999", false}, + {"empty id", "", false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl, ok := idx.GetControl(tt.id) + if ok != tt.expected { + t.Errorf("GetControl(%q): expected found=%v, got found=%v", tt.id, tt.expected, ok) + } + if ok && ctrl.ControlID == "" { + t.Error("Control found but has empty ControlID") + } + }) + } +} + +func TestCanonicalControlIndex_GetControlsByDomain(t *testing.T) { + idx, err := LoadCanonicalControls() + if err != nil { + t.Fatalf("Failed to load controls: %v", err) + } + + authControls := idx.GetControlsByDomain("AUTH") + if len(authControls) != 2 { + t.Errorf("Expected 2 AUTH controls, got %d", len(authControls)) + } + + netControls := idx.GetControlsByDomain("NET") + if len(netControls) != 2 { + t.Errorf("Expected 2 NET controls, got %d", len(netControls)) + } + + emptyControls := idx.GetControlsByDomain("NOPE") + if len(emptyControls) != 0 { + t.Errorf("Expected 0 controls for unknown domain, got %d", len(emptyControls)) + } +} + +func TestCanonicalControlIndex_GetControlsBySeverity(t *testing.T) { + idx, err := LoadCanonicalControls() + if err != nil { + t.Fatalf("Failed to load controls: %v", err) + } + + highControls := idx.GetControlsBySeverity("high") + if len(highControls) < 5 { + t.Errorf("Expected at least 5 high-severity controls, got %d", len(highControls)) + } + + criticalControls := idx.GetControlsBySeverity("critical") + if len(criticalControls) != 1 { + t.Errorf("Expected 1 critical control, got %d", len(criticalControls)) + } +} + +func TestCanonicalControlIndex_GetControlsByFramework(t *testing.T) { + idx, err := LoadCanonicalControls() + if err != nil { + t.Fatalf("Failed to load controls: %v", err) + } + + owaspControls := idx.GetControlsByFramework("OWASP ASVS") + if len(owaspControls) == 0 { + t.Error("Expected at least 1 control anchored to OWASP ASVS") + } + + nistControls := idx.GetControlsByFramework("NIST SP 800-53") + if len(nistControls) == 0 { + t.Error("Expected at least 1 control anchored to NIST SP 800-53") + } +} + +func TestCanonicalControl_OpenAnchors(t *testing.T) { + idx, err := LoadCanonicalControls() + if err != nil { + t.Fatalf("Failed to load controls: %v", err) + } + + for _, ctrl := range idx.AllControls { + if len(ctrl.OpenAnchors) == 0 { + t.Errorf("Control %s has no open anchors — every control must have at least 1", ctrl.ControlID) + } + for i, anchor := range ctrl.OpenAnchors { + if anchor.Framework == "" { + t.Errorf("Control %s: open_anchor[%d] has empty framework", ctrl.ControlID, i) + } + if anchor.URL == "" { + t.Errorf("Control %s: open_anchor[%d] has empty URL", ctrl.ControlID, i) + } + } + } +} + +func TestCanonicalControlIndex_ValidateControlID(t *testing.T) { + idx, err := LoadCanonicalControls() + if err != nil { + t.Fatalf("Failed to load controls: %v", err) + } + + if !idx.ValidateControlID("AUTH-001") { + t.Error("Expected AUTH-001 to be valid") + } + if idx.ValidateControlID("FAKE-999") { + t.Error("Expected FAKE-999 to be invalid") + } +} + +func TestCanonicalControlIndex_FrameworkMetadata(t *testing.T) { + idx, err := LoadCanonicalControls() + if err != nil { + t.Fatalf("Failed to load controls: %v", err) + } + + if idx.Framework.ID != "bp_security_v1" { + t.Errorf("Expected framework ID 'bp_security_v1', got '%s'", idx.Framework.ID) + } + if len(idx.Domains) != 8 { + t.Errorf("Expected 8 domains, got %d", len(idx.Domains)) + } +} diff --git a/ai-compliance-sdk/policies/canonical_controls_v1.json b/ai-compliance-sdk/policies/canonical_controls_v1.json new file mode 100644 index 0000000..e57dd7c --- /dev/null +++ b/ai-compliance-sdk/policies/canonical_controls_v1.json @@ -0,0 +1,453 @@ +{ + "version": "1.0", + "schema": "canonical_control_library", + "generated": "2026-03-12", + "framework": { + "id": "bp_security_v1", + "name": "BreakPilot Security Controls", + "version": "1.0", + "description": "Eigenstaendig formulierte Security Controls basierend auf offenem Wissen (OWASP, NIST, ENISA). Unabhaengige Taxonomie und Nomenklatur — kein Bezug zu proprietaeren Frameworks." + }, + "total_controls": 10, + "domains": [ + { + "id": "AUTH", + "name": "Identity & Access Management", + "objective": "Sicherstellen, dass nur autorisierte Nutzer Zugriff auf geschuetzte Ressourcen erhalten." + }, + { + "id": "NET", + "name": "Network & Transport Security", + "objective": "Netzwerkkommunikation gegen Abhoeren, Manipulation und Downgrade-Angriffe schuetzen." + }, + { + "id": "SUP", + "name": "Software Supply Chain", + "objective": "Integritaet und Authentizitaet der Software-Lieferkette sicherstellen." + }, + { + "id": "LOG", + "name": "Security Operations & Logging", + "objective": "Sicherheitsrelevante Ereignisse nachvollziehbar erfassen ohne sensible Daten preiszugeben." + }, + { + "id": "WEB", + "name": "Web Application Security", + "objective": "Webanwendungen gegen gaengige Angriffsvektoren haerten." + }, + { + "id": "DATA", + "name": "Data Governance & Classification", + "objective": "Schutzmassnahmen an die Sensitivitaet der verarbeiteten Daten koppeln." + }, + { + "id": "CRYP", + "name": "Cryptographic Operations", + "objective": "Kryptographische Schluessel sicher erzeugen, speichern, rotieren und vernichten." + }, + { + "id": "REL", + "name": "Release & Change Governance", + "objective": "Aenderungen an sicherheitsrelevanten Komponenten kontrolliert einfuehren." + } + ], + "controls": [ + { + "control_id": "AUTH-001", + "title": "Multi-Factor Authentication for Privileged Access", + "domain": "AUTH", + "severity": "high", + "risk_score": 8.5, + "implementation_effort": "m", + "objective": "Privilegierte Konten und administrative Zugaenge muessen durch mindestens zwei unabhaengige Authentisierungsfaktoren geschuetzt werden, um Credential-Diebstahl zu mitigieren.", + "rationale": "Passwort-basierte Authentisierung allein bietet ungenuegenden Schutz gegen Phishing, Credential Stuffing und Brute-Force-Angriffe. NIST und OWASP empfehlen uebereinstimmend MFA fuer jeden Zugang mit erhoehten Rechten. ENISA listet fehlende MFA als Top-Risiko fuer Cloud- und Mobile-Anwendungen.", + "scope": { + "platforms": ["web", "mobile", "api"], + "components": ["authentication-service", "identity-provider", "admin-panel"], + "data_classes": ["credentials", "session-tokens"] + }, + "requirements": [ + "Mindestens zwei Faktoren aus unterschiedlichen Kategorien (Wissen, Besitz, Biometrie) fuer privilegierte Konten", + "Time-based One-Time Passwords (TOTP) oder FIDO2/WebAuthn als zweiter Faktor unterstuetzt", + "Fallback-Mechanismen (Recovery Codes) sicher generiert und verschluesselt gespeichert", + "MFA-Bypass nur mit dokumentierter Ausnahme und zeitlicher Begrenzung" + ], + "test_procedure": [ + "Pruefe, ob Admin-Login ohne zweiten Faktor abgelehnt wird", + "Pruefe, ob TOTP-Codes mit falschem Shared Secret abgelehnt werden", + "Pruefe, ob Recovery Codes nach einmaliger Nutzung invalidiert werden", + "Pruefe, ob MFA-Enrollment bei Erstanmeldung erzwungen wird" + ], + "evidence": [ + {"type": "config", "description": "MFA-Policy-Konfiguration des Identity Providers"}, + {"type": "test_result", "description": "Automatisierte Login-Tests mit/ohne zweiten Faktor"}, + {"type": "audit_log", "description": "MFA-Enrollment- und Challenge-Logs"} + ], + "open_anchors": [ + {"framework": "OWASP ASVS", "ref": "V2.8 — One-Time Verifier", "url": "https://owasp.org/www-project-application-security-verification-standard/"}, + {"framework": "NIST SP 800-63B", "ref": "Section 4 — Authenticator Assurance Levels", "url": "https://pages.nist.gov/800-63-3/sp800-63b.html"}, + {"framework": "ENISA", "ref": "Good Practices for IoT/Cloud — Authentication", "url": "https://www.enisa.europa.eu/publications/good-practices-for-security-of-iot-1"}, + {"framework": "OWASP Top 10", "ref": "A07:2021 — Identification and Authentication Failures", "url": "https://owasp.org/Top10/A07_2021-Identification_and_Authentication_Failures/"} + ], + "tags": ["authentication", "mfa", "iam", "privileged-access"] + }, + { + "control_id": "AUTH-002", + "title": "Secure Token Lifecycle Management", + "domain": "AUTH", + "severity": "high", + "risk_score": 8.0, + "implementation_effort": "m", + "objective": "Authentisierungs- und Autorisierungs-Tokens muessen sicher generiert, gespeichert, uebertragen und invalidiert werden, um Session-Hijacking und Token-Leakage zu verhindern.", + "rationale": "Unsicheres Token-Handling ist ein wiederkehrender Angriffsvektor. OWASP ASVS und NIST definieren klare Anforderungen an Token-Entropie, Speicherung und Invalidierung. Tokens in Local Storage oder unverschluesselt auf dem Geraet sind besonders anfaellig.", + "scope": { + "platforms": ["web", "mobile", "api"], + "components": ["session-manager", "oauth-server", "api-gateway"], + "data_classes": ["session-tokens", "refresh-tokens", "api-keys"] + }, + "requirements": [ + "Tokens mit mindestens 128 Bit Entropie aus kryptographisch sicherem PRNG generieren", + "Access Tokens kurzlebig (max. 15 Minuten), Refresh Tokens mit Rotation bei Nutzung", + "Tokens auf Clientseite in sicherem Speicher (Keychain/Keystore, HttpOnly Cookies) — nicht in Local Storage", + "Serverseitige Token-Invalidierung bei Logout, Passwortaenderung und Verdacht auf Kompromittierung", + "Token-Binding an Client-Kontext (IP-Range, Device Fingerprint) wo moeglich" + ], + "test_procedure": [ + "Pruefe Token-Entropie (min. 128 Bit) durch Analyse generierter Tokens", + "Pruefe, ob abgelaufene Tokens serverseitig abgelehnt werden", + "Pruefe, ob Refresh Token Rotation nach Nutzung den alten Token invalidiert", + "Pruefe, ob Tokens nach Logout serverseitig nicht mehr akzeptiert werden" + ], + "evidence": [ + {"type": "code_review", "description": "Token-Generierung und -Speicherung im Quellcode"}, + {"type": "test_result", "description": "Automatisierte Token-Lifecycle-Tests"}, + {"type": "config", "description": "Token-TTL- und Rotations-Konfiguration"} + ], + "open_anchors": [ + {"framework": "OWASP ASVS", "ref": "V3.5 — Token-based Session Management", "url": "https://owasp.org/www-project-application-security-verification-standard/"}, + {"framework": "NIST SP 800-63B", "ref": "Section 7 — Session Management", "url": "https://pages.nist.gov/800-63-3/sp800-63b.html"}, + {"framework": "OWASP Top 10", "ref": "A07:2021 — Identification and Authentication Failures", "url": "https://owasp.org/Top10/A07_2021-Identification_and_Authentication_Failures/"}, + {"framework": "OWASP MASVS", "ref": "MASVS-AUTH — Authentication and Session Management", "url": "https://mas.owasp.org/MASVS/05-MASVS-AUTH/"} + ], + "tags": ["tokens", "session", "oauth", "iam"] + }, + { + "control_id": "NET-001", + "title": "Mandatory Transport Encryption", + "domain": "NET", + "severity": "high", + "risk_score": 9.0, + "implementation_effort": "s", + "objective": "Jede Netzwerkkommunikation zwischen Client und Server sowie zwischen Services muss mit TLS 1.2+ verschluesselt sein. Unsichere Protokolle und Cipher Suites muessen deaktiviert werden.", + "rationale": "Unverschluesselte Kommunikation erlaubt Abhoeren und Man-in-the-Middle-Angriffe. Alle relevanten Sicherheitsstandards (NIST, OWASP, ENISA) fordern TLS als Baseline. TLS 1.0 und 1.1 gelten als unsicher und muessen deaktiviert werden.", + "scope": { + "platforms": ["web", "mobile", "api", "backend"], + "components": ["reverse-proxy", "api-gateway", "service-mesh", "database-connections"], + "data_classes": ["all-in-transit"] + }, + "requirements": [ + "TLS 1.2 als Minimum, TLS 1.3 bevorzugt fuer alle externen und internen Verbindungen", + "SSLv3, TLS 1.0, TLS 1.1 vollstaendig deaktiviert", + "Nur starke Cipher Suites: AEAD-basiert (AES-GCM, ChaCha20-Poly1305), kein CBC, kein RC4", + "HSTS-Header mit includeSubDomains und preload fuer Webanwendungen", + "Keine Mixed-Content-Ausnahmen in Produktionsumgebungen" + ], + "test_procedure": [ + "TLS-Scan aller oeffentlichen Endpunkte (z.B. mit testssl.sh oder ssllabs)", + "Pruefe, ob Verbindungen mit TLS < 1.2 abgelehnt werden", + "Pruefe HSTS-Header auf korrekte Konfiguration (max-age >= 31536000)", + "Pruefe interne Service-zu-Service-Kommunikation auf TLS-Nutzung" + ], + "evidence": [ + {"type": "scan_result", "description": "TLS-Scan-Report (testssl.sh oder SSLLabs)"}, + {"type": "config", "description": "Nginx/Reverse-Proxy TLS-Konfiguration"}, + {"type": "test_result", "description": "Automatisierte Cipher-Suite-Pruefung"} + ], + "open_anchors": [ + {"framework": "OWASP ASVS", "ref": "V9.1 — Communication Security", "url": "https://owasp.org/www-project-application-security-verification-standard/"}, + {"framework": "NIST SP 800-52", "ref": "Guidelines for TLS Implementations", "url": "https://csrc.nist.gov/publications/detail/sp/800-52/rev-2/final"}, + {"framework": "OWASP Top 10", "ref": "A02:2021 — Cryptographic Failures", "url": "https://owasp.org/Top10/A02_2021-Cryptographic_Failures/"}, + {"framework": "ENISA", "ref": "Algorithms, Key Sizes and Parameters Report", "url": "https://www.enisa.europa.eu/publications/algorithms-key-size-and-parameters-report-2014"} + ], + "tags": ["tls", "encryption", "transport", "network"] + }, + { + "control_id": "NET-002", + "title": "Certificate Trust Store Hardening", + "domain": "NET", + "severity": "medium", + "risk_score": 6.5, + "implementation_effort": "m", + "objective": "Anwendungen muessen den Zertifikats-Trust-Store einschraenken und Certificate Pinning oder Certificate Transparency nutzen, um Man-in-the-Middle-Angriffe durch kompromittierte CAs zu erschweren.", + "rationale": "Das Standard-CA-System vertraut hunderten CAs weltweit. Eine kompromittierte CA kann gueltige Zertifikate fuer beliebige Domains ausstellen. OWASP MASVS empfiehlt Certificate Pinning fuer mobile Apps, NIST und Certificate Transparency Logs bieten ergaenzende Massnahmen.", + "scope": { + "platforms": ["mobile", "api"], + "components": ["http-client", "tls-config", "trust-store"], + "data_classes": ["certificates", "tls-metadata"] + }, + "requirements": [ + "Mobile Apps: Certificate Pinning gegen den Server-Public-Key oder Intermediate-CA", + "Pin-Rotation: Backup-Pins fuer geplanten Zertifikatswechsel vorhalten", + "Certificate Transparency: Expect-CT Header oder CT-Log-Monitoring fuer Webdienste", + "Regelmaessige Pruefung der Trust-Store-Eintraege auf abgelaufene oder zurueckgerufene CAs" + ], + "test_procedure": [ + "Pruefe, ob Mobile App Verbindungen mit selbstsigniertem Zertifikat ablehnt", + "Pruefe, ob Backup-Pins konfiguriert sind fuer nahtlose Rotation", + "Pruefe CT-Log-Monitoring auf unerwartete Zertifikatsausstellungen", + "Pruefe, ob CRL/OCSP-Stapling aktiviert ist" + ], + "evidence": [ + {"type": "config", "description": "Certificate Pinning Konfiguration im Mobile-Client"}, + {"type": "test_result", "description": "MITM-Proxy-Test mit falschem Zertifikat"}, + {"type": "monitoring", "description": "CT-Log-Monitoring-Dashboard"} + ], + "open_anchors": [ + {"framework": "OWASP MASVS", "ref": "MASVS-NETWORK — Network Communication", "url": "https://mas.owasp.org/MASVS/06-MASVS-NETWORK/"}, + {"framework": "NIST SP 800-52", "ref": "Section 3.5 — Server Certificate Validation", "url": "https://csrc.nist.gov/publications/detail/sp/800-52/rev-2/final"}, + {"framework": "OWASP", "ref": "Certificate and Public Key Pinning Cheat Sheet", "url": "https://cheatsheetseries.owasp.org/cheatsheets/Pinning_Cheat_Sheet.html"} + ], + "tags": ["certificates", "pinning", "trust-store", "network"] + }, + { + "control_id": "SUP-001", + "title": "Software Distribution Integrity & Update Verification", + "domain": "SUP", + "severity": "high", + "risk_score": 8.0, + "implementation_effort": "l", + "objective": "Software-Updates und -Pakete muessen kryptographisch signiert und vor der Installation verifiziert werden, um Manipulation in der Lieferkette zu erkennen.", + "rationale": "Supply-Chain-Angriffe (z.B. SolarWinds, Codecov) zeigen, dass unsignierte oder ungepruefte Updates ein kritisches Einfallstor sind. NIST SSDF, OWASP und SLSA definieren Mindestanforderungen an Build-Provenance und Signaturpruefung.", + "scope": { + "platforms": ["mobile", "web", "backend"], + "components": ["ci-cd-pipeline", "package-registry", "auto-updater", "app-store"], + "data_classes": ["binaries", "packages", "container-images"] + }, + "requirements": [ + "Alle Release-Artefakte kryptographisch signiert (z.B. GPG, Sigstore/Cosign fuer Container)", + "Signaturpruefung vor jeder Installation oder Deployment — unsignierte Artefakte ablehnen", + "SBOM (Software Bill of Materials) fuer jedes Release generieren und archivieren", + "Dependency-Scanning (SCA) in CI/CD-Pipeline integriert, bekannte CVEs blockieren", + "Reproduzierbare Builds wo technisch moeglich, Build-Provenance dokumentieren" + ], + "test_procedure": [ + "Pruefe, ob unsignierte Artefakte vom Deployment abgelehnt werden", + "Pruefe, ob SBOM fuer das letzte Release vorhanden und vollstaendig ist", + "Pruefe, ob Dependency-Scanner in CI/CD aktiv ist und bei Critical CVEs blockiert", + "Pruefe, ob Container-Images mit Cosign/Notary signiert sind" + ], + "evidence": [ + {"type": "config", "description": "CI/CD-Pipeline mit Signatur- und Scan-Steps"}, + {"type": "artifact", "description": "Signiertes SBOM des letzten Release"}, + {"type": "scan_result", "description": "SCA-Report der letzten Pipeline-Ausfuehrung"} + ], + "open_anchors": [ + {"framework": "NIST SSDF", "ref": "PW.4 — Reusable Software Integrity Verification", "url": "https://csrc.nist.gov/publications/detail/sp/800-218/final"}, + {"framework": "OWASP Top 10", "ref": "A08:2021 — Software and Data Integrity Failures", "url": "https://owasp.org/Top10/A08_2021-Software_and_Data_Integrity_Failures/"}, + {"framework": "SLSA", "ref": "Supply-chain Levels for Software Artifacts", "url": "https://slsa.dev/spec/v1.0/levels"}, + {"framework": "NIST SP 800-53", "ref": "SA-12 — Supply Chain Protection", "url": "https://csrc.nist.gov/publications/detail/sp/800-53/rev-5/final"} + ], + "tags": ["supply-chain", "signing", "sbom", "sdlc"] + }, + { + "control_id": "LOG-001", + "title": "Privacy-Aware Security Logging", + "domain": "LOG", + "severity": "medium", + "risk_score": 6.0, + "implementation_effort": "m", + "objective": "Sicherheitsrelevante Ereignisse vollstaendig protokollieren, dabei personenbezogene Daten konsequent reduzieren (Redaction-First-Prinzip).", + "rationale": "Effektive Incident Response erfordert vollstaendige Security Logs. Gleichzeitig verlangt DSGVO Art. 5(1)(c) Datenminimierung — Logs duerfen keine ueberflüssigen personenbezogenen Daten enthalten. OWASP Logging Cheat Sheet und NIST SP 800-92 definieren Best Practices fuer sicheres Logging.", + "scope": { + "platforms": ["web", "mobile", "api", "backend"], + "components": ["logging-framework", "siem", "log-aggregator"], + "data_classes": ["security-events", "access-logs", "error-logs"] + }, + "requirements": [ + "Alle sicherheitsrelevanten Events loggen: Login/Logout, Rechteaenderungen, Fehlgeschlagene Zugriffe, Konfigurationsaenderungen", + "PII-Redaction: Passwoerter, Tokens, Kreditkarten, IP-Adressen wo moeglich pseudonymisieren", + "Strukturiertes Logging (JSON) mit einheitlichem Schema: timestamp, event_type, actor_id, resource, outcome", + "Log-Integritaet: Tamper-Protection durch Signaturen oder Write-Once-Storage", + "Retention Policy: Security Logs mindestens 90 Tage, maximal nach Compliance-Anforderung" + ], + "test_procedure": [ + "Pruefe, ob fehlgeschlagene Login-Versuche geloggt werden (mit pseudonymisierter IP)", + "Pruefe, ob Passwoerter und Tokens in keinem Log-Eintrag im Klartext erscheinen", + "Pruefe, ob Log-Eintraege das definierte JSON-Schema einhalten", + "Pruefe, ob Logs aelter als Retention-Periode automatisch geloescht werden" + ], + "evidence": [ + {"type": "config", "description": "Logging-Framework-Konfiguration mit Redaction-Regeln"}, + {"type": "test_result", "description": "Log-Analyse auf PII-Leaks"}, + {"type": "policy", "description": "Log-Retention-Policy-Dokument"} + ], + "open_anchors": [ + {"framework": "OWASP", "ref": "Logging Cheat Sheet", "url": "https://cheatsheetseries.owasp.org/cheatsheets/Logging_Cheat_Sheet.html"}, + {"framework": "NIST SP 800-92", "ref": "Guide to Computer Security Log Management", "url": "https://csrc.nist.gov/publications/detail/sp/800-92/final"}, + {"framework": "OWASP Top 10", "ref": "A09:2021 — Security Logging and Monitoring Failures", "url": "https://owasp.org/Top10/A09_2021-Security_Logging_and_Monitoring_Failures/"}, + {"framework": "OWASP ASVS", "ref": "V7 — Error Handling and Logging", "url": "https://owasp.org/www-project-application-security-verification-standard/"} + ], + "tags": ["logging", "monitoring", "privacy", "redaction"] + }, + { + "control_id": "WEB-001", + "title": "Hardened Administrative and Account Recovery Flows", + "domain": "WEB", + "severity": "high", + "risk_score": 7.5, + "implementation_effort": "m", + "objective": "Administrative Zugaenge und Account-Recovery-Prozesse (Passwort-Reset, E-Mail-Aenderung) muessen gegen Enumeration, Brute-Force und Social Engineering gehaertet werden.", + "rationale": "Account-Recovery-Flows sind haeufig schwaecher geschuetzt als der regulaere Login und werden gezielt angegriffen. OWASP identifiziert unsichere Recovery-Mechanismen als verbreitetes Problem. Rate Limiting, sichere Token-Generierung und Vermeidung von User-Enumeration sind essentiell.", + "scope": { + "platforms": ["web", "mobile"], + "components": ["password-reset", "email-change", "admin-panel", "account-recovery"], + "data_classes": ["credentials", "recovery-tokens", "email-addresses"] + }, + "requirements": [ + "Passwort-Reset-Tokens kryptographisch sicher (min. 128 Bit), zeitlich begrenzt (max. 1 Stunde), einmalig nutzbar", + "Keine User-Enumeration: Identische Antwort unabhaengig ob Account existiert", + "Rate Limiting auf Reset- und Recovery-Endpunkte (max. 5 Versuche / 15 Minuten pro IP)", + "Admin-Panels nicht oeffentlich erreichbar oder zusaetzlich durch IP-Whitelist/VPN geschuetzt", + "E-Mail-Aenderung erfordert Bestaetigung an alte UND neue Adresse" + ], + "test_procedure": [ + "Pruefe, ob Reset-Endpunkt bei existierendem und nicht-existierendem Account identisch antwortet", + "Pruefe, ob Reset-Token nach einmaliger Nutzung invalidiert wird", + "Pruefe Rate Limiting: 6. Versuch innerhalb von 15 Minuten wird blockiert", + "Pruefe, ob Admin-Panel von externen IPs nicht erreichbar ist" + ], + "evidence": [ + {"type": "test_result", "description": "Automatisierte Enumeration- und Rate-Limit-Tests"}, + {"type": "config", "description": "Rate-Limiting-Konfiguration"}, + {"type": "network_config", "description": "Admin-Panel-Zugriffsbeschraenkung (IP/VPN)"} + ], + "open_anchors": [ + {"framework": "OWASP ASVS", "ref": "V2.5 — Credential Recovery", "url": "https://owasp.org/www-project-application-security-verification-standard/"}, + {"framework": "OWASP", "ref": "Forgot Password Cheat Sheet", "url": "https://cheatsheetseries.owasp.org/cheatsheets/Forgot_Password_Cheat_Sheet.html"}, + {"framework": "NIST SP 800-63B", "ref": "Section 6.1.2 — Memorized Secret Recovery", "url": "https://pages.nist.gov/800-63-3/sp800-63b.html"}, + {"framework": "OWASP Top 10", "ref": "A07:2021 — Identification and Authentication Failures", "url": "https://owasp.org/Top10/A07_2021-Identification_and_Authentication_Failures/"} + ], + "tags": ["account-recovery", "admin", "rate-limiting", "web"] + }, + { + "control_id": "DATA-001", + "title": "Data-Classification-Driven Security Measures", + "domain": "DATA", + "severity": "critical", + "risk_score": 9.5, + "implementation_effort": "l", + "objective": "Schutzmassnahmen muessen automatisch an die Klassifikation der verarbeiteten Daten gekoppelt sein. Hoehere Datenklassen erfordern staerkere Controls.", + "rationale": "Ein einheitliches Schutzniveau fuer alle Daten ist entweder uebermaessig teuer oder unzureichend fuer sensible Daten. NIST SP 800-53, ISO 27001 und DSGVO Art. 32 fordern risikoadaequate Massnahmen. Die Datenklassifikation bildet die Grundlage fuer die Auswahl geeigneter Controls.", + "scope": { + "platforms": ["web", "mobile", "api", "backend"], + "components": ["data-catalog", "access-control", "encryption-service", "backup-system"], + "data_classes": ["public", "internal", "confidential", "restricted"] + }, + "requirements": [ + "Datenklassifikationsschema definiert: Public, Internal, Confidential, Restricted (mit Beispielen je Klasse)", + "Jede Datenverarbeitung mit Klassifikation versehen — Default ist 'Internal' (nicht Public)", + "Confidential/Restricted: Verschluesselung at Rest und in Transit obligatorisch", + "Restricted: Zusaetzlich Zugriffsprotokollierung, Need-to-Know-Prinzip, Vier-Augen fuer Export", + "Automatische Policy-Enforcement: Datenklasse bestimmt verfuegbare Operationen (Export, Sharing, Retention)" + ], + "test_procedure": [ + "Pruefe, ob jede Tabelle/Collection eine Datenklassifikation hat", + "Pruefe, ob Confidential-Daten at Rest verschluesselt sind", + "Pruefe, ob Restricted-Daten nur mit Zugriffsprotokollierung abrufbar sind", + "Pruefe, ob Export von Restricted-Daten Vier-Augen-Freigabe erfordert" + ], + "evidence": [ + {"type": "policy", "description": "Datenklassifikationsschema mit Beispielen"}, + {"type": "config", "description": "Encryption-at-Rest-Konfiguration pro Datenklasse"}, + {"type": "audit_log", "description": "Zugriffsprotokolle fuer Restricted-Daten"} + ], + "open_anchors": [ + {"framework": "NIST SP 800-53", "ref": "RA-2 — Security Categorization", "url": "https://csrc.nist.gov/publications/detail/sp/800-53/rev-5/final"}, + {"framework": "NIST SP 800-60", "ref": "Guide for Mapping Types of Information to Security Categories", "url": "https://csrc.nist.gov/publications/detail/sp/800-60/vol-1-rev-1/final"}, + {"framework": "OWASP ASVS", "ref": "V6.1 — Data Classification", "url": "https://owasp.org/www-project-application-security-verification-standard/"}, + {"framework": "ENISA", "ref": "Data Protection Engineering — Data Classification", "url": "https://www.enisa.europa.eu/publications/data-protection-engineering"} + ], + "tags": ["data-classification", "governance", "encryption", "access-control"] + }, + { + "control_id": "CRYP-001", + "title": "Cryptographic Key Lifecycle Management", + "domain": "CRYP", + "severity": "high", + "risk_score": 8.5, + "implementation_effort": "l", + "objective": "Kryptographische Schluessel muessen sicher erzeugt, gespeichert, rotiert und vernichtet werden. Der gesamte Lebenszyklus muss dokumentiert und auditierbar sein.", + "rationale": "Selbst starke Algorithmen bieten keinen Schutz, wenn Schluessel unsicher gespeichert oder nie rotiert werden. NIST SP 800-57 definiert den Key-Lifecycle. OWASP warnt explizit vor Hard-coded Keys und fehlender Rotation.", + "scope": { + "platforms": ["backend", "api"], + "components": ["key-management-service", "vault", "encryption-service", "certificate-manager"], + "data_classes": ["encryption-keys", "signing-keys", "api-keys"] + }, + "requirements": [ + "Schluessel in Hardware Security Module (HSM) oder Software-Vault (z.B. HashiCorp Vault) — nie im Quellcode oder Konfigurationsdateien", + "Schluesselgenerierung mit kryptographisch sicherem PRNG (CSPRNG), Mindestlaenge nach Algorithmus (AES-256, RSA-3072+, Ed25519)", + "Rotation: Symmetrische Schluessel mindestens jaehrlich, asymmetrische nach Algorithmus-Empfehlung", + "Sichere Vernichtung: Alte Schluessel nach Ablauf der Aufbewahrungsfrist kryptographisch ueberschrieben", + "Trennung: Unterschiedliche Schluessel fuer unterschiedliche Zwecke (Signing vs. Encryption)" + ], + "test_procedure": [ + "Pruefe, ob keine Schluessel im Quellcode oder .env-Dateien hartcodiert sind (Secret Scanner)", + "Pruefe, ob Vault/HSM fuer Schluesseloperationen genutzt wird", + "Pruefe, ob Schluessel-Rotation-Logs vorhanden sind", + "Pruefe, ob unterschiedliche Schluessel fuer Signing und Encryption verwendet werden" + ], + "evidence": [ + {"type": "config", "description": "Vault/HSM-Konfiguration und Zugriffsrichtlinien"}, + {"type": "scan_result", "description": "Secret-Scanner-Report (kein Leak im Repo)"}, + {"type": "audit_log", "description": "Key-Rotation-Historie aus Vault"} + ], + "open_anchors": [ + {"framework": "NIST SP 800-57", "ref": "Key Management — Part 1: General", "url": "https://csrc.nist.gov/publications/detail/sp/800-57-part-1/rev-5/final"}, + {"framework": "OWASP", "ref": "Key Management Cheat Sheet", "url": "https://cheatsheetseries.owasp.org/cheatsheets/Key_Management_Cheat_Sheet.html"}, + {"framework": "OWASP Top 10", "ref": "A02:2021 — Cryptographic Failures", "url": "https://owasp.org/Top10/A02_2021-Cryptographic_Failures/"}, + {"framework": "ENISA", "ref": "Algorithms, Key Sizes and Parameters Report", "url": "https://www.enisa.europa.eu/publications/algorithms-key-size-and-parameters-report-2014"} + ], + "tags": ["key-management", "crypto", "vault", "rotation"] + }, + { + "control_id": "REL-001", + "title": "Security Change Impact Assessment", + "domain": "REL", + "severity": "medium", + "risk_score": 5.5, + "implementation_effort": "m", + "objective": "Jede Aenderung an sicherheitsrelevanten Komponenten muss vor dem Deployment eine strukturierte Impact-Bewertung durchlaufen.", + "rationale": "Ungepruefte Aenderungen an Security-Controls koennen den gesamten Schutzlevel untergraben. NIST SP 800-53 (CM-4) und OWASP fordern Change Impact Assessments. Ein formalisierter Prozess verhindert, dass Security-Regressionen unbemerkt in Produktion gelangen.", + "scope": { + "platforms": ["all"], + "components": ["ci-cd-pipeline", "change-management", "code-review"], + "data_classes": ["source-code", "infrastructure-config", "security-policies"] + }, + "requirements": [ + "Aenderungen an auth, crypto, access-control, logging als 'security-relevant' getaggt", + "Security-relevante Changes erfordern Review durch Security-qualifizierten Reviewer", + "Automatisierte Security-Regression-Tests in CI/CD fuer alle security-relevanten Pfade", + "Rollback-Plan dokumentiert fuer jedes security-relevante Deployment", + "Post-Deployment-Monitoring: Erhoehte Alerting-Schwelle fuer 24h nach Security-Change" + ], + "test_procedure": [ + "Pruefe, ob MRs/PRs mit Security-relevanten Dateien automatisch getaggt werden", + "Pruefe, ob Security-getaggte MRs einen zweiten Reviewer erfordern", + "Pruefe, ob Security-Regression-Tests in CI/CD vorhanden und aktiv sind", + "Pruefe, ob Rollback-Dokumentation fuer letzte 3 Security-Changes existiert" + ], + "evidence": [ + {"type": "process", "description": "Change-Management-Prozessbeschreibung"}, + {"type": "config", "description": "CI/CD-Konfiguration mit Security-Review-Gate"}, + {"type": "audit_log", "description": "Merge-Request-Historie mit Security-Tags"} + ], + "open_anchors": [ + {"framework": "NIST SP 800-53", "ref": "CM-4 — Impact Analyses", "url": "https://csrc.nist.gov/publications/detail/sp/800-53/rev-5/final"}, + {"framework": "OWASP SAMM", "ref": "Implementation — Secure Deployment", "url": "https://owaspsamm.org/model/implementation/secure-deployment/"}, + {"framework": "NIST SSDF", "ref": "PO.1 — Security Requirements for Software Development", "url": "https://csrc.nist.gov/publications/detail/sp/800-218/final"}, + {"framework": "ENISA", "ref": "Secure Development Lifecycle", "url": "https://www.enisa.europa.eu/publications/standards-and-tools-for-secure-software-development"} + ], + "tags": ["change-management", "ci-cd", "review", "governance"] + } + ] +} diff --git a/backend-compliance/compliance/api/__init__.py b/backend-compliance/compliance/api/__init__.py index cf12ad7..7f6713a 100644 --- a/backend-compliance/compliance/api/__init__.py +++ b/backend-compliance/compliance/api/__init__.py @@ -33,6 +33,7 @@ from .change_request_routes import router as change_request_router from .generation_routes import router as generation_router from .project_routes import router as project_router from .wiki_routes import router as wiki_router +from .canonical_control_routes import router as canonical_control_router # Include sub-routers router.include_router(audit_router) @@ -67,6 +68,7 @@ router.include_router(change_request_router) router.include_router(generation_router) router.include_router(project_router) router.include_router(wiki_router) +router.include_router(canonical_control_router) __all__ = [ "router", @@ -101,4 +103,5 @@ __all__ = [ "generation_router", "project_router", "wiki_router", + "canonical_control_router", ] diff --git a/backend-compliance/compliance/api/canonical_control_routes.py b/backend-compliance/compliance/api/canonical_control_routes.py new file mode 100644 index 0000000..685ac99 --- /dev/null +++ b/backend-compliance/compliance/api/canonical_control_routes.py @@ -0,0 +1,332 @@ +""" +FastAPI routes for the Canonical Control Library. + +Provides read-only access to independently authored security controls. +All controls are formulated without proprietary nomenclature and anchored +in open-source frameworks (OWASP, NIST, ENISA). + +Endpoints: + GET /v1/canonical/frameworks — All frameworks + GET /v1/canonical/frameworks/{framework_id} — Framework details + GET /v1/canonical/frameworks/{framework_id}/controls — Controls of a framework + GET /v1/canonical/controls — All controls (filterable) + GET /v1/canonical/controls/{control_id} — Single control by control_id + GET /v1/canonical/sources — Source registry + GET /v1/canonical/licenses — License matrix + POST /v1/canonical/controls/{control_id}/similarity-check — Too-close check +""" + +from __future__ import annotations + +import logging +from typing import Any, Optional + +from fastapi import APIRouter, HTTPException, Query +from pydantic import BaseModel +from sqlalchemy import text + +from database import SessionLocal +from compliance.services.license_gate import get_license_matrix, get_source_permissions +from compliance.services.similarity_detector import check_similarity + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/v1/canonical", tags=["canonical-controls"]) + + +# ============================================================================= +# RESPONSE MODELS +# ============================================================================= + +class FrameworkResponse(BaseModel): + id: str + framework_id: str + name: str + version: str + description: Optional[str] = None + owner: Optional[str] = None + policy_version: Optional[str] = None + release_state: str + created_at: str + updated_at: str + + +class ControlResponse(BaseModel): + id: str + framework_id: str + control_id: str + title: str + objective: str + rationale: str + scope: dict + requirements: list + test_procedure: list + evidence: list + severity: str + risk_score: Optional[float] = None + implementation_effort: Optional[str] = None + evidence_confidence: Optional[float] = None + open_anchors: list + release_state: str + tags: list + created_at: str + updated_at: str + + +class SimilarityCheckRequest(BaseModel): + source_text: str + candidate_text: str + + +class SimilarityCheckResponse(BaseModel): + max_exact_run: int + token_overlap: float + ngram_jaccard: float + embedding_cosine: float + lcs_ratio: float + status: str + details: dict + + +# ============================================================================= +# HELPERS +# ============================================================================= + +def _row_to_dict(row, columns: list[str]) -> dict[str, Any]: + """Generic row → dict converter.""" + return {col: (getattr(row, col).isoformat() if hasattr(getattr(row, col, None), 'isoformat') else getattr(row, col)) for col in columns} + + +# ============================================================================= +# FRAMEWORKS +# ============================================================================= + +@router.get("/frameworks") +async def list_frameworks(): + """List all registered control frameworks.""" + with SessionLocal() as db: + rows = db.execute( + text(""" + SELECT id, framework_id, name, version, description, + owner, policy_version, release_state, + created_at, updated_at + FROM canonical_control_frameworks + ORDER BY name + """) + ).fetchall() + + return [ + { + "id": str(r.id), + "framework_id": r.framework_id, + "name": r.name, + "version": r.version, + "description": r.description, + "owner": r.owner, + "policy_version": r.policy_version, + "release_state": r.release_state, + "created_at": r.created_at.isoformat() if r.created_at else None, + "updated_at": r.updated_at.isoformat() if r.updated_at else None, + } + for r in rows + ] + + +@router.get("/frameworks/{framework_id}") +async def get_framework(framework_id: str): + """Get a single framework by its framework_id.""" + with SessionLocal() as db: + row = db.execute( + text(""" + SELECT id, framework_id, name, version, description, + owner, policy_version, release_state, + created_at, updated_at + FROM canonical_control_frameworks + WHERE framework_id = :fid + """), + {"fid": framework_id}, + ).fetchone() + + if not row: + raise HTTPException(status_code=404, detail="Framework not found") + + return { + "id": str(row.id), + "framework_id": row.framework_id, + "name": row.name, + "version": row.version, + "description": row.description, + "owner": row.owner, + "policy_version": row.policy_version, + "release_state": row.release_state, + "created_at": row.created_at.isoformat() if row.created_at else None, + "updated_at": row.updated_at.isoformat() if row.updated_at else None, + } + + +@router.get("/frameworks/{framework_id}/controls") +async def list_framework_controls( + framework_id: str, + severity: Optional[str] = Query(None), + release_state: Optional[str] = Query(None), +): + """List controls belonging to a framework.""" + with SessionLocal() as db: + # Resolve framework UUID + fw = db.execute( + text("SELECT id FROM canonical_control_frameworks WHERE framework_id = :fid"), + {"fid": framework_id}, + ).fetchone() + if not fw: + raise HTTPException(status_code=404, detail="Framework not found") + + query = """ + SELECT id, framework_id, control_id, title, objective, rationale, + scope, requirements, test_procedure, evidence, + severity, risk_score, implementation_effort, + evidence_confidence, open_anchors, release_state, tags, + created_at, updated_at + FROM canonical_controls + WHERE framework_id = :fw_id + """ + params: dict[str, Any] = {"fw_id": str(fw.id)} + + if severity: + query += " AND severity = :sev" + params["sev"] = severity + if release_state: + query += " AND release_state = :rs" + params["rs"] = release_state + + query += " ORDER BY control_id" + rows = db.execute(text(query), params).fetchall() + + return [_control_row(r) for r in rows] + + +# ============================================================================= +# CONTROLS +# ============================================================================= + +@router.get("/controls") +async def list_controls( + severity: Optional[str] = Query(None), + domain: Optional[str] = Query(None), + release_state: Optional[str] = Query(None), +): + """List all canonical controls, with optional filters.""" + query = """ + SELECT id, framework_id, control_id, title, objective, rationale, + scope, requirements, test_procedure, evidence, + severity, risk_score, implementation_effort, + evidence_confidence, open_anchors, release_state, tags, + created_at, updated_at + FROM canonical_controls + WHERE 1=1 + """ + params: dict[str, Any] = {} + + if severity: + query += " AND severity = :sev" + params["sev"] = severity + if domain: + query += " AND LEFT(control_id, LENGTH(:dom)) = :dom" + params["dom"] = domain.upper() + if release_state: + query += " AND release_state = :rs" + params["rs"] = release_state + + query += " ORDER BY control_id" + + with SessionLocal() as db: + rows = db.execute(text(query), params).fetchall() + + return [_control_row(r) for r in rows] + + +@router.get("/controls/{control_id}") +async def get_control(control_id: str): + """Get a single canonical control by its control_id (e.g. AUTH-001).""" + with SessionLocal() as db: + row = db.execute( + text(""" + SELECT id, framework_id, control_id, title, objective, rationale, + scope, requirements, test_procedure, evidence, + severity, risk_score, implementation_effort, + evidence_confidence, open_anchors, release_state, tags, + created_at, updated_at + FROM canonical_controls + WHERE control_id = :cid + """), + {"cid": control_id.upper()}, + ).fetchone() + + if not row: + raise HTTPException(status_code=404, detail="Control not found") + + return _control_row(row) + + +# ============================================================================= +# SIMILARITY CHECK +# ============================================================================= + +@router.post("/controls/{control_id}/similarity-check") +async def similarity_check(control_id: str, body: SimilarityCheckRequest): + """Run the too-close detector against a source/candidate text pair.""" + report = await check_similarity(body.source_text, body.candidate_text) + return { + "control_id": control_id.upper(), + "max_exact_run": report.max_exact_run, + "token_overlap": report.token_overlap, + "ngram_jaccard": report.ngram_jaccard, + "embedding_cosine": report.embedding_cosine, + "lcs_ratio": report.lcs_ratio, + "status": report.status, + "details": report.details, + } + + +# ============================================================================= +# SOURCES & LICENSES +# ============================================================================= + +@router.get("/sources") +async def list_sources(): + """List all registered sources with permission flags.""" + with SessionLocal() as db: + return get_source_permissions(db) + + +@router.get("/licenses") +async def list_licenses(): + """Return the license matrix.""" + with SessionLocal() as db: + return get_license_matrix(db) + + +# ============================================================================= +# INTERNAL HELPERS +# ============================================================================= + +def _control_row(r) -> dict: + return { + "id": str(r.id), + "framework_id": str(r.framework_id), + "control_id": r.control_id, + "title": r.title, + "objective": r.objective, + "rationale": r.rationale, + "scope": r.scope, + "requirements": r.requirements, + "test_procedure": r.test_procedure, + "evidence": r.evidence, + "severity": r.severity, + "risk_score": float(r.risk_score) if r.risk_score is not None else None, + "implementation_effort": r.implementation_effort, + "evidence_confidence": float(r.evidence_confidence) if r.evidence_confidence is not None else None, + "open_anchors": r.open_anchors, + "release_state": r.release_state, + "tags": r.tags or [], + "created_at": r.created_at.isoformat() if r.created_at else None, + "updated_at": r.updated_at.isoformat() if r.updated_at else None, + } diff --git a/backend-compliance/compliance/services/license_gate.py b/backend-compliance/compliance/services/license_gate.py new file mode 100644 index 0000000..b4f73bc --- /dev/null +++ b/backend-compliance/compliance/services/license_gate.py @@ -0,0 +1,116 @@ +""" +License Gate — checks whether a given source may be used for a specific purpose. + +Usage types: + - analysis: Read + analyse internally (TDM under UrhG 44b) + - store_excerpt: Store verbatim excerpt in vault + - ship_embeddings: Ship embeddings in product + - ship_in_product: Ship text/content in product + +Policy is driven by the canonical_control_sources table columns: + allowed_analysis, allowed_store_excerpt, allowed_ship_embeddings, allowed_ship_in_product +""" + +from __future__ import annotations + +import logging +from typing import Any + +from sqlalchemy import text +from sqlalchemy.orm import Session + +logger = logging.getLogger(__name__) + +USAGE_COLUMN_MAP = { + "analysis": "allowed_analysis", + "store_excerpt": "allowed_store_excerpt", + "ship_embeddings": "allowed_ship_embeddings", + "ship_in_product": "allowed_ship_in_product", +} + + +def check_source_allowed(db: Session, source_id: str, usage_type: str) -> bool: + """Check whether *source_id* may be used for *usage_type*. + + Returns False if the source is unknown or the usage is not allowed. + """ + col = USAGE_COLUMN_MAP.get(usage_type) + if col is None: + logger.warning("Unknown usage_type=%s", usage_type) + return False + + row = db.execute( + text(f"SELECT {col} FROM canonical_control_sources WHERE source_id = :sid"), + {"sid": source_id}, + ).fetchone() + + if row is None: + logger.warning("Source %s not found in registry", source_id) + return False + + return bool(row[0]) + + +def get_license_matrix(db: Session) -> list[dict[str, Any]]: + """Return the full license matrix with allowed usages per license.""" + rows = db.execute( + text(""" + SELECT license_id, name, terms_url, commercial_use, + ai_training_restriction, tdm_allowed_under_44b, + deletion_required, notes + FROM canonical_control_licenses + ORDER BY license_id + """) + ).fetchall() + + return [ + { + "license_id": r.license_id, + "name": r.name, + "terms_url": r.terms_url, + "commercial_use": r.commercial_use, + "ai_training_restriction": r.ai_training_restriction, + "tdm_allowed_under_44b": r.tdm_allowed_under_44b, + "deletion_required": r.deletion_required, + "notes": r.notes, + } + for r in rows + ] + + +def get_source_permissions(db: Session) -> list[dict[str, Any]]: + """Return all sources with their permission flags.""" + rows = db.execute( + text(""" + SELECT s.source_id, s.title, s.publisher, s.url, s.version_label, + s.language, s.license_id, + s.allowed_analysis, s.allowed_store_excerpt, + s.allowed_ship_embeddings, s.allowed_ship_in_product, + s.vault_retention_days, s.vault_access_tier, + l.name AS license_name, l.commercial_use + FROM canonical_control_sources s + JOIN canonical_control_licenses l ON l.license_id = s.license_id + ORDER BY s.source_id + """) + ).fetchall() + + return [ + { + "source_id": r.source_id, + "title": r.title, + "publisher": r.publisher, + "url": r.url, + "version_label": r.version_label, + "language": r.language, + "license_id": r.license_id, + "license_name": r.license_name, + "commercial_use": r.commercial_use, + "allowed_analysis": r.allowed_analysis, + "allowed_store_excerpt": r.allowed_store_excerpt, + "allowed_ship_embeddings": r.allowed_ship_embeddings, + "allowed_ship_in_product": r.allowed_ship_in_product, + "vault_retention_days": r.vault_retention_days, + "vault_access_tier": r.vault_access_tier, + } + for r in rows + ] diff --git a/backend-compliance/compliance/services/similarity_detector.py b/backend-compliance/compliance/services/similarity_detector.py new file mode 100644 index 0000000..b283114 --- /dev/null +++ b/backend-compliance/compliance/services/similarity_detector.py @@ -0,0 +1,223 @@ +""" +Too-Close Similarity Detector — checks whether a candidate text is too similar +to a protected source text (copyright / license compliance). + +Five metrics: + 1. Exact-phrase — longest identical token sequence + 2. Token overlap — Jaccard similarity of token sets + 3. 3-gram Jaccard — Jaccard similarity of character 3-grams + 4. Embedding cosine — via bge-m3 (Ollama or embedding-service) + 5. LCS ratio — Longest Common Subsequence / max(len_a, len_b) + +Decision: + PASS — no fail + max 1 warn + WARN — max 2 warn, no fail → human review + FAIL — any fail threshold → block, rewrite required +""" + +from __future__ import annotations + +import logging +import re +from dataclasses import dataclass +from typing import Optional + +import httpx + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Thresholds +# --------------------------------------------------------------------------- + +THRESHOLDS = { + "max_exact_run": {"warn": 8, "fail": 12}, + "token_overlap": {"warn": 0.20, "fail": 0.30}, + "ngram_jaccard": {"warn": 0.10, "fail": 0.18}, + "embedding_cosine": {"warn": 0.86, "fail": 0.92}, + "lcs_ratio": {"warn": 0.35, "fail": 0.50}, +} + +# --------------------------------------------------------------------------- +# Tokenisation helpers +# --------------------------------------------------------------------------- + +_WORD_RE = re.compile(r"\w+", re.UNICODE) + + +def _tokenize(text: str) -> list[str]: + return [t.lower() for t in _WORD_RE.findall(text)] + + +def _char_ngrams(text: str, n: int = 3) -> set[str]: + text = text.lower() + return {text[i : i + n] for i in range(len(text) - n + 1)} if len(text) >= n else set() + + +# --------------------------------------------------------------------------- +# Metric implementations +# --------------------------------------------------------------------------- + +def max_exact_run(tokens_a: list[str], tokens_b: list[str]) -> int: + """Longest contiguous identical token sequence between a and b.""" + if not tokens_a or not tokens_b: + return 0 + + best = 0 + set_b = set(tokens_b) + + for i in range(len(tokens_a)): + if tokens_a[i] not in set_b: + continue + for j in range(len(tokens_b)): + if tokens_a[i] != tokens_b[j]: + continue + run = 0 + ii, jj = i, j + while ii < len(tokens_a) and jj < len(tokens_b) and tokens_a[ii] == tokens_b[jj]: + run += 1 + ii += 1 + jj += 1 + if run > best: + best = run + return best + + +def token_overlap_jaccard(tokens_a: list[str], tokens_b: list[str]) -> float: + """Jaccard similarity of token sets.""" + set_a, set_b = set(tokens_a), set(tokens_b) + if not set_a and not set_b: + return 0.0 + return len(set_a & set_b) / len(set_a | set_b) + + +def ngram_jaccard(text_a: str, text_b: str, n: int = 3) -> float: + """Jaccard similarity of character n-grams.""" + grams_a = _char_ngrams(text_a, n) + grams_b = _char_ngrams(text_b, n) + if not grams_a and not grams_b: + return 0.0 + return len(grams_a & grams_b) / len(grams_a | grams_b) + + +def lcs_ratio(tokens_a: list[str], tokens_b: list[str]) -> float: + """LCS length / max(len_a, len_b).""" + m, n = len(tokens_a), len(tokens_b) + if m == 0 or n == 0: + return 0.0 + + # Space-optimised LCS (two rows) + prev = [0] * (n + 1) + curr = [0] * (n + 1) + for i in range(1, m + 1): + for j in range(1, n + 1): + if tokens_a[i - 1] == tokens_b[j - 1]: + curr[j] = prev[j - 1] + 1 + else: + curr[j] = max(prev[j], curr[j - 1]) + prev, curr = curr, [0] * (n + 1) + + return prev[n] / max(m, n) + + +async def embedding_cosine(text_a: str, text_b: str, embedding_url: str | None = None) -> float: + """Cosine similarity via embedding service (bge-m3). + + Falls back to 0.0 if the service is unreachable. + """ + url = embedding_url or "http://embedding-service:8087" + try: + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.post( + f"{url}/embed", + json={"texts": [text_a, text_b]}, + ) + resp.raise_for_status() + embeddings = resp.json().get("embeddings", []) + if len(embeddings) < 2: + return 0.0 + return _cosine(embeddings[0], embeddings[1]) + except Exception: + logger.warning("Embedding service unreachable, skipping cosine check") + return 0.0 + + +def _cosine(a: list[float], b: list[float]) -> float: + dot = sum(x * y for x, y in zip(a, b)) + norm_a = sum(x * x for x in a) ** 0.5 + norm_b = sum(x * x for x in b) ** 0.5 + if norm_a == 0 or norm_b == 0: + return 0.0 + return dot / (norm_a * norm_b) + + +# --------------------------------------------------------------------------- +# Decision engine +# --------------------------------------------------------------------------- + +@dataclass +class SimilarityReport: + max_exact_run: int + token_overlap: float + ngram_jaccard: float + embedding_cosine: float + lcs_ratio: float + status: str # PASS, WARN, FAIL + details: dict # per-metric status + + +def _classify(value: float | int, metric: str) -> str: + t = THRESHOLDS[metric] + if value >= t["fail"]: + return "FAIL" + if value >= t["warn"]: + return "WARN" + return "PASS" + + +async def check_similarity( + source_text: str, + candidate_text: str, + embedding_url: str | None = None, +) -> SimilarityReport: + """Run all 5 metrics and return an aggregate report.""" + tok_src = _tokenize(source_text) + tok_cand = _tokenize(candidate_text) + + m_exact = max_exact_run(tok_src, tok_cand) + m_token = token_overlap_jaccard(tok_src, tok_cand) + m_ngram = ngram_jaccard(source_text, candidate_text) + m_embed = await embedding_cosine(source_text, candidate_text, embedding_url) + m_lcs = lcs_ratio(tok_src, tok_cand) + + details = { + "max_exact_run": _classify(m_exact, "max_exact_run"), + "token_overlap": _classify(m_token, "token_overlap"), + "ngram_jaccard": _classify(m_ngram, "ngram_jaccard"), + "embedding_cosine": _classify(m_embed, "embedding_cosine"), + "lcs_ratio": _classify(m_lcs, "lcs_ratio"), + } + + fail_count = sum(1 for v in details.values() if v == "FAIL") + warn_count = sum(1 for v in details.values() if v == "WARN") + + if fail_count > 0: + status = "FAIL" + elif warn_count > 2: + status = "FAIL" + elif warn_count > 1: + status = "WARN" + elif warn_count == 1: + status = "PASS" + else: + status = "PASS" + + return SimilarityReport( + max_exact_run=m_exact, + token_overlap=round(m_token, 4), + ngram_jaccard=round(m_ngram, 4), + embedding_cosine=round(m_embed, 4), + lcs_ratio=round(m_lcs, 4), + status=status, + details=details, + ) diff --git a/backend-compliance/compliance/tests/test_similarity_detector.py b/backend-compliance/compliance/tests/test_similarity_detector.py new file mode 100644 index 0000000..7b0fbbc --- /dev/null +++ b/backend-compliance/compliance/tests/test_similarity_detector.py @@ -0,0 +1,118 @@ +"""Tests for the Too-Close Similarity Detector.""" + +import pytest +from compliance.services.similarity_detector import ( + max_exact_run, + token_overlap_jaccard, + ngram_jaccard, + lcs_ratio, + check_similarity, + _tokenize, +) + + +class TestTokenize: + def test_basic(self): + tokens = _tokenize("Hello World 123") + assert tokens == ["hello", "world", "123"] + + def test_german_umlauts(self): + tokens = _tokenize("Schutzmaßnahmen für Daten") + assert len(tokens) == 3 + + def test_empty(self): + assert _tokenize("") == [] + + +class TestMaxExactRun: + def test_identical(self): + tokens = _tokenize("the quick brown fox jumps over the lazy dog") + assert max_exact_run(tokens, tokens) == len(tokens) + + def test_partial_match(self): + a = _tokenize("the quick brown fox") + b = _tokenize("a quick brown cat") + assert max_exact_run(a, b) == 2 # "quick brown" + + def test_no_match(self): + a = _tokenize("hello world") + b = _tokenize("foo bar") + assert max_exact_run(a, b) == 0 + + def test_empty(self): + assert max_exact_run([], []) == 0 + assert max_exact_run(["a"], []) == 0 + + +class TestTokenOverlapJaccard: + def test_identical(self): + tokens = _tokenize("hello world") + assert token_overlap_jaccard(tokens, tokens) == 1.0 + + def test_no_overlap(self): + a = _tokenize("hello world") + b = _tokenize("foo bar") + assert token_overlap_jaccard(a, b) == 0.0 + + def test_partial(self): + a = _tokenize("hello world foo") + b = _tokenize("hello bar baz") + # intersection: {hello}, union: {hello, world, foo, bar, baz} + assert abs(token_overlap_jaccard(a, b) - 0.2) < 0.01 + + +class TestNgramJaccard: + def test_identical(self): + assert ngram_jaccard("hello", "hello") == 1.0 + + def test_different(self): + assert ngram_jaccard("abc", "xyz") == 0.0 + + def test_short(self): + assert ngram_jaccard("ab", "cd") == 0.0 # too short for 3-grams + + +class TestLcsRatio: + def test_identical(self): + tokens = _tokenize("multi factor authentication required") + assert lcs_ratio(tokens, tokens) == 1.0 + + def test_partial(self): + a = _tokenize("multi factor authentication") + b = _tokenize("single factor verification") + # LCS: "factor" (length 1), max(3,3) = 3, ratio = 1/3 + result = lcs_ratio(a, b) + assert 0.3 < result < 0.4 + + def test_empty(self): + assert lcs_ratio([], []) == 0.0 + + +class TestCheckSimilarity: + @pytest.mark.asyncio + async def test_identical_texts_fail(self): + text = "Multi-factor authentication must be enforced for all administrative accounts." + report = await check_similarity(text, text, embedding_url="http://localhost:99999") + # Identical texts should have max overlap + assert report.token_overlap == 1.0 + assert report.status == "FAIL" + + @pytest.mark.asyncio + async def test_different_texts_pass(self): + source = "Die Anwendung muss eine Zwei-Faktor-Authentisierung implementieren." + candidate = "Network traffic should be encrypted using TLS 1.3 at minimum." + report = await check_similarity(source, candidate, embedding_url="http://localhost:99999") + assert report.token_overlap < 0.1 + assert report.status == "PASS" + + @pytest.mark.asyncio + async def test_report_fields(self): + report = await check_similarity("hello world", "foo bar", embedding_url="http://localhost:99999") + assert hasattr(report, "max_exact_run") + assert hasattr(report, "token_overlap") + assert hasattr(report, "ngram_jaccard") + assert hasattr(report, "embedding_cosine") + assert hasattr(report, "lcs_ratio") + assert hasattr(report, "status") + assert hasattr(report, "details") + assert report.status in ("PASS", "WARN", "FAIL") diff --git a/backend-compliance/migrations/044_canonical_control_library.sql b/backend-compliance/migrations/044_canonical_control_library.sql new file mode 100644 index 0000000..09c1a31 --- /dev/null +++ b/backend-compliance/migrations/044_canonical_control_library.sql @@ -0,0 +1,204 @@ +-- Migration 044: Canonical Control Library +-- Provides a legally defensible, independently authored security control library. +-- Controls are formulated independently (no BSI/proprietary nomenclature). +-- Every control MUST have open-source anchors (OWASP, NIST, ENISA). +-- Source provenance is tracked internally for audit, never shipped in product. +-- +-- Tables: +-- 1. canonical_control_licenses — License metadata for source materials +-- 2. canonical_control_sources — Source registry (internal, not product-facing) +-- 3. canonical_control_frameworks — Registered control frameworks +-- 4. canonical_controls — The actual controls (product-facing) +-- 5. canonical_control_mappings — Provenance trail (internal audit) + +BEGIN; + +-- ============================================================================= +-- 1. License Metadata +-- ============================================================================= + +CREATE TABLE IF NOT EXISTS canonical_control_licenses ( + license_id VARCHAR(50) PRIMARY KEY, + name VARCHAR(255) NOT NULL, + terms_url TEXT, + commercial_use VARCHAR(20) NOT NULL + CHECK (commercial_use IN ('allowed', 'restricted', 'prohibited', 'unclear')), + ai_training_restriction VARCHAR(20), + tdm_allowed_under_44b VARCHAR(10) + CHECK (tdm_allowed_under_44b IN ('yes', 'no', 'unclear')), + deletion_required BOOLEAN DEFAULT false, + notes TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- ============================================================================= +-- 2. Source Registry (internal — never shipped in product) +-- ============================================================================= + +CREATE TABLE IF NOT EXISTS canonical_control_sources ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + source_id VARCHAR(50) UNIQUE NOT NULL, + title VARCHAR(500) NOT NULL, + publisher VARCHAR(100) NOT NULL, + url TEXT, + version_label VARCHAR(50), + language VARCHAR(5) DEFAULT 'de', + license_id VARCHAR(50) NOT NULL + REFERENCES canonical_control_licenses(license_id), + allowed_analysis BOOLEAN DEFAULT false, + allowed_store_excerpt BOOLEAN DEFAULT false, + allowed_ship_embeddings BOOLEAN DEFAULT false, + allowed_ship_in_product BOOLEAN DEFAULT false, + vault_retention_days INTEGER DEFAULT 30, + vault_access_tier VARCHAR(20) DEFAULT 'restricted' + CHECK (vault_access_tier IN ('restricted', 'internal', 'public')), + retrieved_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_ccs_license ON canonical_control_sources(license_id); + +-- ============================================================================= +-- 3. Control Frameworks +-- ============================================================================= + +CREATE TABLE IF NOT EXISTS canonical_control_frameworks ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + framework_id VARCHAR(50) UNIQUE NOT NULL, + name VARCHAR(255) NOT NULL, + version VARCHAR(20) NOT NULL, + description TEXT, + owner VARCHAR(100) DEFAULT 'security-platform', + policy_version VARCHAR(20), + release_state VARCHAR(20) DEFAULT 'draft' + CHECK (release_state IN ('draft', 'review', 'approved', 'deprecated')), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW() +); + +-- ============================================================================= +-- 4. Canonical Controls (product-facing) +-- ============================================================================= + +CREATE TABLE IF NOT EXISTS canonical_controls ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + framework_id UUID NOT NULL + REFERENCES canonical_control_frameworks(id), + control_id VARCHAR(20) NOT NULL, + title VARCHAR(255) NOT NULL, + objective TEXT NOT NULL, + rationale TEXT NOT NULL, + scope JSONB NOT NULL DEFAULT '{}', + requirements JSONB NOT NULL DEFAULT '[]', + test_procedure JSONB NOT NULL DEFAULT '[]', + evidence JSONB NOT NULL DEFAULT '[]', + severity VARCHAR(20) NOT NULL + CHECK (severity IN ('low', 'medium', 'high', 'critical')), + risk_score NUMERIC(3,1) CHECK (risk_score >= 0 AND risk_score <= 10), + implementation_effort VARCHAR(2) + CHECK (implementation_effort IN ('s', 'm', 'l', 'xl')), + evidence_confidence NUMERIC(3,2) CHECK (evidence_confidence >= 0 AND evidence_confidence <= 1), + open_anchors JSONB NOT NULL DEFAULT '[]', + release_state VARCHAR(20) DEFAULT 'draft' + CHECK (release_state IN ('draft', 'review', 'approved', 'deprecated')), + tags JSONB DEFAULT '[]', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW(), + UNIQUE (framework_id, control_id) +); + +CREATE INDEX IF NOT EXISTS idx_canonical_controls_domain + ON canonical_controls ((LEFT(control_id, 4))); +CREATE INDEX IF NOT EXISTS idx_canonical_controls_severity + ON canonical_controls (severity); +CREATE INDEX IF NOT EXISTS idx_canonical_controls_release + ON canonical_controls (release_state); +CREATE INDEX IF NOT EXISTS idx_canonical_controls_framework + ON canonical_controls (framework_id); + +-- ============================================================================= +-- 5. Control Mappings / Provenance (internal audit trail) +-- ============================================================================= + +CREATE TABLE IF NOT EXISTS canonical_control_mappings ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + control_id UUID NOT NULL + REFERENCES canonical_controls(id) ON DELETE CASCADE, + source_id UUID NOT NULL + REFERENCES canonical_control_sources(id), + mapping_type VARCHAR(30) NOT NULL + CHECK (mapping_type IN ('inspired_by_internal', 'corroborated_by_open', 'derived_only_open')), + attribution_class VARCHAR(20) NOT NULL + CHECK (attribution_class IN ('internal_only', 'product_ok')), + source_locator VARCHAR(100), + paraphrase_note TEXT, + excerpt_hashes JSONB DEFAULT '[]', + similarity_report JSONB, + reviewed_by VARCHAR(100), + reviewed_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_ccm_control ON canonical_control_mappings(control_id); +CREATE INDEX IF NOT EXISTS idx_ccm_source ON canonical_control_mappings(source_id); + +-- ============================================================================= +-- SEED: Licenses +-- ============================================================================= + +INSERT INTO canonical_control_licenses (license_id, name, terms_url, commercial_use, ai_training_restriction, tdm_allowed_under_44b, deletion_required, notes) +VALUES + ('BSI_TOS_2025', 'BSI Nutzungsbedingungen', 'https://www.bsi.bund.de/impressum', 'restricted', 'unclear', 'yes', true, + 'Kommerziell nur mit Zustimmung. TDM unter UrhG 44b erlaubt, Kopien danach loeschen.'), + ('OWASP_CC_BY_SA', 'Creative Commons BY-SA 4.0', 'https://creativecommons.org/licenses/by-sa/4.0/', 'allowed', null, 'yes', false, + 'Offen, Attribution + ShareAlike. Kommerziell erlaubt.'), + ('NIST_PUBLIC_DOMAIN', 'US Government Public Domain', 'https://www.nist.gov/open/copyright-fair-use-and-licensing-statements-srd-data-software-and-technical-series-publications', 'allowed', null, 'yes', false, + 'US-Regierungswerke sind gemeinfrei. Keine Einschraenkungen.'), + ('ENISA_CC_BY', 'Creative Commons BY 4.0', 'https://creativecommons.org/licenses/by/4.0/', 'allowed', null, 'yes', false, + 'Offen, nur Attribution. Kommerziell erlaubt.'), + ('ETSI_RESTRICTIVE', 'ETSI Terms of Use', 'https://www.etsi.org/intellectual-property-rights', 'prohibited', 'prohibited', 'no', true, + 'Kommerzielle Nutzung und AI-Training ausdruecklich verboten.'), + ('ISO_PAYWALLED', 'ISO Copyright', 'https://www.iso.org/privacy-and-copyright.html', 'prohibited', 'prohibited', 'unclear', true, + 'Kostenpflichtig. Kein Recht auf Reproduktion, Paraphrase muss hinreichend abstrahiert sein.'), + ('IEC_AI_PROHIBITED', 'IEC Terms of Use', 'https://www.iec.ch/terms-conditions', 'prohibited', 'prohibited', 'no', true, + 'AI-Training explizit verboten.'), + ('CSA_NC', 'CSA Noncommercial', 'https://cloudsecurityalliance.org/license/', 'restricted', null, 'unclear', false, + 'Noncommercial license. Kommerziell nur mit separater Vereinbarung.'), + ('CIS_CC_BY_NC_ND', 'Creative Commons BY-NC-ND 4.0', 'https://creativecommons.org/licenses/by-nc-nd/4.0/', 'prohibited', null, 'yes', false, + 'Kein kommerzieller Gebrauch, keine Ableitungen.') +ON CONFLICT (license_id) DO NOTHING; + +-- ============================================================================= +-- SEED: Sources +-- ============================================================================= + +INSERT INTO canonical_control_sources (source_id, title, publisher, url, version_label, language, license_id, allowed_analysis, allowed_store_excerpt, allowed_ship_embeddings, allowed_ship_in_product) +VALUES + ('BSI_TR03161_1', 'BSI TR-03161 Teil 1 — Mobile Anwendungen', 'BSI', 'https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Publikationen/TechnischeRichtlinien/TR03161/BSI-TR-03161-1.html', '1.0', 'de', 'BSI_TOS_2025', true, false, false, false), + ('BSI_TR03161_2', 'BSI TR-03161 Teil 2 — Web-Anwendungen', 'BSI', 'https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Publikationen/TechnischeRichtlinien/TR03161/BSI-TR-03161-2.html', '1.0', 'de', 'BSI_TOS_2025', true, false, false, false), + ('BSI_TR03161_3', 'BSI TR-03161 Teil 3 — Hintergrunddienste', 'BSI', 'https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Publikationen/TechnischeRichtlinien/TR03161/BSI-TR-03161-3.html', '1.0', 'de', 'BSI_TOS_2025', true, false, false, false), + ('OWASP_ASVS', 'OWASP Application Security Verification Standard', 'OWASP Foundation', 'https://owasp.org/www-project-application-security-verification-standard/', '4.0.3', 'en', 'OWASP_CC_BY_SA', true, true, true, true), + ('OWASP_MASVS', 'OWASP Mobile Application Security Verification Standard', 'OWASP Foundation', 'https://mas.owasp.org/', '2.1.0', 'en', 'OWASP_CC_BY_SA', true, true, true, true), + ('OWASP_TOP10', 'OWASP Top 10', 'OWASP Foundation', 'https://owasp.org/www-project-top-ten/', '2021', 'en', 'OWASP_CC_BY_SA', true, true, true, true), + ('NIST_SP800_53', 'NIST SP 800-53 Rev. 5 — Security and Privacy Controls', 'NIST', 'https://csrc.nist.gov/publications/detail/sp/800-53/rev-5/final', 'Rev.5', 'en', 'NIST_PUBLIC_DOMAIN', true, true, true, true), + ('NIST_SP800_63B', 'NIST SP 800-63B — Digital Identity Guidelines (Authentication)', 'NIST', 'https://pages.nist.gov/800-63-3/sp800-63b.html', 'Rev.3', 'en', 'NIST_PUBLIC_DOMAIN', true, true, true, true), + ('ENISA_GOOD_PRACTICES', 'ENISA Good Practices for Security of IoT/Mobile', 'ENISA', 'https://www.enisa.europa.eu/publications', null, 'en', 'ENISA_CC_BY', true, true, true, true), + ('CIS_CONTROLS', 'CIS Critical Security Controls', 'Center for Internet Security', 'https://www.cisecurity.org/controls', 'v8', 'en', 'CIS_CC_BY_NC_ND', true, false, false, false) +ON CONFLICT (source_id) DO NOTHING; + +-- ============================================================================= +-- SEED: Default Framework +-- ============================================================================= + +INSERT INTO canonical_control_frameworks (framework_id, name, version, description, owner, release_state) +VALUES ( + 'bp_security_v1', + 'BreakPilot Security Controls', + '1.0', + 'Eigenstaendig formulierte Security Controls basierend auf offenem Wissen (OWASP, NIST, ENISA). Unabhaengige Taxonomie und Nomenklatur.', + 'security-platform', + 'draft' +) +ON CONFLICT (framework_id) DO NOTHING; + +COMMIT; diff --git a/backend-compliance/tests/test_canonical_control_routes.py b/backend-compliance/tests/test_canonical_control_routes.py new file mode 100644 index 0000000..1bd1fb7 --- /dev/null +++ b/backend-compliance/tests/test_canonical_control_routes.py @@ -0,0 +1,225 @@ +"""Tests for Canonical Control Library routes (canonical_control_routes.py).""" + +import pytest +from unittest.mock import MagicMock, patch +from datetime import datetime, timezone + +from compliance.api.canonical_control_routes import ( + FrameworkResponse, + ControlResponse, + SimilarityCheckRequest, + SimilarityCheckResponse, + _control_row, +) + + +class TestFrameworkResponse: + """Tests for FrameworkResponse model.""" + + def test_basic_creation(self): + resp = FrameworkResponse( + id="uuid-1", + framework_id="bp_security_v1", + name="BreakPilot Security Controls", + version="1.0", + release_state="draft", + created_at="2026-03-12T00:00:00+00:00", + updated_at="2026-03-12T00:00:00+00:00", + ) + assert resp.framework_id == "bp_security_v1" + assert resp.version == "1.0" + + def test_optional_fields(self): + resp = FrameworkResponse( + id="uuid-1", + framework_id="test", + name="Test", + version="1.0", + release_state="draft", + created_at="2026-03-12T00:00:00+00:00", + updated_at="2026-03-12T00:00:00+00:00", + ) + assert resp.description is None + assert resp.owner is None + assert resp.policy_version is None + + +class TestControlResponse: + """Tests for ControlResponse model.""" + + def test_full_control(self): + resp = ControlResponse( + id="uuid-1", + framework_id="uuid-fw", + control_id="AUTH-001", + title="Multi-Factor Authentication", + objective="Require MFA for privileged access.", + rationale="Passwords alone are insufficient.", + scope={"platforms": ["web"]}, + requirements=["MFA for admin accounts"], + test_procedure=["Test admin login without MFA"], + evidence=[{"type": "config", "description": "MFA config"}], + severity="high", + open_anchors=[{"framework": "OWASP ASVS", "ref": "V2.8", "url": "https://owasp.org"}], + release_state="draft", + tags=["mfa", "auth"], + created_at="2026-03-12T00:00:00+00:00", + updated_at="2026-03-12T00:00:00+00:00", + ) + assert resp.control_id == "AUTH-001" + assert resp.severity == "high" + assert len(resp.open_anchors) == 1 + + def test_optional_numeric_fields(self): + resp = ControlResponse( + id="uuid-1", + framework_id="uuid-fw", + control_id="NET-001", + title="TLS", + objective="Encrypt traffic.", + rationale="Prevent eavesdropping.", + scope={}, + requirements=[], + test_procedure=[], + evidence=[], + severity="high", + open_anchors=[], + release_state="draft", + tags=[], + created_at="2026-03-12T00:00:00+00:00", + updated_at="2026-03-12T00:00:00+00:00", + ) + assert resp.risk_score is None + assert resp.implementation_effort is None + assert resp.evidence_confidence is None + + +class TestSimilarityCheckRequest: + """Tests for SimilarityCheckRequest model.""" + + def test_valid_request(self): + req = SimilarityCheckRequest( + source_text="Die Anwendung muss MFA implementieren.", + candidate_text="Multi-factor authentication is required.", + ) + assert req.source_text == "Die Anwendung muss MFA implementieren." + assert req.candidate_text == "Multi-factor authentication is required." + + def test_empty_strings(self): + req = SimilarityCheckRequest(source_text="", candidate_text="") + assert req.source_text == "" + + +class TestSimilarityCheckResponse: + """Tests for SimilarityCheckResponse model.""" + + def test_pass_status(self): + resp = SimilarityCheckResponse( + max_exact_run=2, + token_overlap=0.05, + ngram_jaccard=0.03, + embedding_cosine=0.45, + lcs_ratio=0.12, + status="PASS", + details={ + "max_exact_run": "PASS", + "token_overlap": "PASS", + "ngram_jaccard": "PASS", + "embedding_cosine": "PASS", + "lcs_ratio": "PASS", + }, + ) + assert resp.status == "PASS" + + def test_fail_status(self): + resp = SimilarityCheckResponse( + max_exact_run=15, + token_overlap=0.35, + ngram_jaccard=0.20, + embedding_cosine=0.95, + lcs_ratio=0.55, + status="FAIL", + details={ + "max_exact_run": "FAIL", + "token_overlap": "FAIL", + "ngram_jaccard": "FAIL", + "embedding_cosine": "FAIL", + "lcs_ratio": "FAIL", + }, + ) + assert resp.status == "FAIL" + + +class TestControlRowConversion: + """Tests for _control_row helper.""" + + def _make_row(self, **overrides): + now = datetime.now(timezone.utc) + defaults = { + "id": "uuid-ctrl-1", + "framework_id": "uuid-fw-1", + "control_id": "AUTH-001", + "title": "Multi-Factor Authentication", + "objective": "Require MFA.", + "rationale": "Passwords insufficient.", + "scope": {"platforms": ["web", "mobile"]}, + "requirements": ["Req 1", "Req 2"], + "test_procedure": ["Test 1"], + "evidence": [{"type": "config", "description": "MFA config"}], + "severity": "high", + "risk_score": 8.5, + "implementation_effort": "m", + "evidence_confidence": 0.85, + "open_anchors": [ + {"framework": "OWASP ASVS", "ref": "V2.8", "url": "https://owasp.org"}, + ], + "release_state": "draft", + "tags": ["mfa"], + "created_at": now, + "updated_at": now, + } + defaults.update(overrides) + mock = MagicMock() + for key, value in defaults.items(): + setattr(mock, key, value) + return mock + + def test_basic_conversion(self): + row = self._make_row() + result = _control_row(row) + assert result["control_id"] == "AUTH-001" + assert result["severity"] == "high" + assert result["risk_score"] == 8.5 + assert result["implementation_effort"] == "m" + assert result["evidence_confidence"] == 0.85 + assert len(result["open_anchors"]) == 1 + + def test_null_numeric_fields(self): + row = self._make_row(risk_score=None, evidence_confidence=None, implementation_effort=None) + result = _control_row(row) + assert result["risk_score"] is None + assert result["evidence_confidence"] is None + assert result["implementation_effort"] is None + + def test_empty_tags(self): + row = self._make_row(tags=None) + result = _control_row(row) + assert result["tags"] == [] + + def test_empty_tags_list(self): + row = self._make_row(tags=[]) + result = _control_row(row) + assert result["tags"] == [] + + def test_timestamp_format(self): + now = datetime(2026, 3, 12, 10, 30, 0, tzinfo=timezone.utc) + row = self._make_row(created_at=now, updated_at=now) + result = _control_row(row) + assert "2026-03-12" in result["created_at"] + assert "10:30" in result["created_at"] + + def test_none_timestamps(self): + row = self._make_row(created_at=None, updated_at=None) + result = _control_row(row) + assert result["created_at"] is None + assert result["updated_at"] is None diff --git a/backend-compliance/tests/test_license_gate.py b/backend-compliance/tests/test_license_gate.py new file mode 100644 index 0000000..a5275ed --- /dev/null +++ b/backend-compliance/tests/test_license_gate.py @@ -0,0 +1,161 @@ +"""Tests for License Gate service (license_gate.py).""" + +import pytest +from unittest.mock import MagicMock, patch +from collections import namedtuple + +from compliance.services.license_gate import ( + check_source_allowed, + get_license_matrix, + get_source_permissions, + USAGE_COLUMN_MAP, +) + + +class TestUsageColumnMap: + """Test the usage type to column mapping.""" + + def test_all_usage_types_mapped(self): + expected = {"analysis", "store_excerpt", "ship_embeddings", "ship_in_product"} + assert set(USAGE_COLUMN_MAP.keys()) == expected + + def test_column_names(self): + assert USAGE_COLUMN_MAP["analysis"] == "allowed_analysis" + assert USAGE_COLUMN_MAP["store_excerpt"] == "allowed_store_excerpt" + assert USAGE_COLUMN_MAP["ship_embeddings"] == "allowed_ship_embeddings" + assert USAGE_COLUMN_MAP["ship_in_product"] == "allowed_ship_in_product" + + +class TestCheckSourceAllowed: + """Tests for check_source_allowed().""" + + def _mock_db(self, return_value): + db = MagicMock() + mock_result = MagicMock() + if return_value is None: + mock_result.fetchone.return_value = None + else: + mock_result.fetchone.return_value = (return_value,) + db.execute.return_value = mock_result + return db + + def test_allowed_analysis(self): + db = self._mock_db(True) + assert check_source_allowed(db, "OWASP_ASVS", "analysis") is True + + def test_denied_ship_in_product(self): + db = self._mock_db(False) + assert check_source_allowed(db, "BSI_TR03161_1", "ship_in_product") is False + + def test_unknown_source(self): + db = self._mock_db(None) + assert check_source_allowed(db, "NONEXISTENT", "analysis") is False + + def test_unknown_usage_type(self): + db = MagicMock() + assert check_source_allowed(db, "OWASP_ASVS", "invalid_type") is False + # DB should not be called for invalid usage type + db.execute.assert_not_called() + + def test_allowed_store_excerpt(self): + db = self._mock_db(True) + assert check_source_allowed(db, "OWASP_ASVS", "store_excerpt") is True + + def test_denied_store_excerpt(self): + db = self._mock_db(False) + assert check_source_allowed(db, "BSI_TR03161_1", "store_excerpt") is False + + +class TestGetLicenseMatrix: + """Tests for get_license_matrix().""" + + def test_returns_list(self): + LicRow = namedtuple("LicRow", [ + "license_id", "name", "terms_url", "commercial_use", + "ai_training_restriction", "tdm_allowed_under_44b", + "deletion_required", "notes", + ]) + rows = [ + LicRow("OWASP_CC_BY_SA", "CC BY-SA 4.0", "https://example.com", + "allowed", None, "yes", False, "Open source"), + LicRow("BSI_TOS_2025", "BSI ToS", "https://bsi.bund.de", + "restricted", "unclear", "yes", True, "Commercial restricted"), + ] + + db = MagicMock() + db.execute.return_value.fetchall.return_value = rows + result = get_license_matrix(db) + + assert len(result) == 2 + assert result[0]["license_id"] == "OWASP_CC_BY_SA" + assert result[0]["commercial_use"] == "allowed" + assert result[0]["deletion_required"] is False + assert result[1]["license_id"] == "BSI_TOS_2025" + assert result[1]["commercial_use"] == "restricted" + assert result[1]["deletion_required"] is True + + def test_empty_result(self): + db = MagicMock() + db.execute.return_value.fetchall.return_value = [] + result = get_license_matrix(db) + assert result == [] + + +class TestGetSourcePermissions: + """Tests for get_source_permissions().""" + + def test_returns_list_with_join(self): + SrcRow = namedtuple("SrcRow", [ + "source_id", "title", "publisher", "url", "version_label", + "language", "license_id", "allowed_analysis", "allowed_store_excerpt", + "allowed_ship_embeddings", "allowed_ship_in_product", + "vault_retention_days", "vault_access_tier", + "license_name", "commercial_use", + ]) + rows = [ + SrcRow( + "OWASP_ASVS", "OWASP ASVS", "OWASP Foundation", + "https://owasp.org", "4.0.3", "en", "OWASP_CC_BY_SA", + True, True, True, True, 30, "public", + "CC BY-SA 4.0", "allowed", + ), + ] + + db = MagicMock() + db.execute.return_value.fetchall.return_value = rows + result = get_source_permissions(db) + + assert len(result) == 1 + src = result[0] + assert src["source_id"] == "OWASP_ASVS" + assert src["allowed_analysis"] is True + assert src["allowed_ship_in_product"] is True + assert src["license_name"] == "CC BY-SA 4.0" + assert src["commercial_use"] == "allowed" + + def test_restricted_source(self): + SrcRow = namedtuple("SrcRow", [ + "source_id", "title", "publisher", "url", "version_label", + "language", "license_id", "allowed_analysis", "allowed_store_excerpt", + "allowed_ship_embeddings", "allowed_ship_in_product", + "vault_retention_days", "vault_access_tier", + "license_name", "commercial_use", + ]) + rows = [ + SrcRow( + "BSI_TR03161_1", "BSI TR-03161 Teil 1", "BSI", + "https://bsi.bund.de", "1.0", "de", "BSI_TOS_2025", + True, False, False, False, 30, "restricted", + "BSI Nutzungsbedingungen", "restricted", + ), + ] + + db = MagicMock() + db.execute.return_value.fetchall.return_value = rows + result = get_source_permissions(db) + + src = result[0] + assert src["allowed_analysis"] is True + assert src["allowed_store_excerpt"] is False + assert src["allowed_ship_embeddings"] is False + assert src["allowed_ship_in_product"] is False diff --git a/backend-compliance/tests/test_validate_controls.py b/backend-compliance/tests/test_validate_controls.py new file mode 100644 index 0000000..2189b28 --- /dev/null +++ b/backend-compliance/tests/test_validate_controls.py @@ -0,0 +1,142 @@ +"""Tests for the CI/CD control validator script.""" + +import json +import subprocess +import sys +from pathlib import Path + +import pytest + +REPO_ROOT = Path(__file__).resolve().parent.parent.parent +VALIDATOR = REPO_ROOT / "scripts" / "validate-controls.py" +CONTROLS_FILE = REPO_ROOT / "ai-compliance-sdk" / "policies" / "canonical_controls_v1.json" + + +class TestValidatorScript: + """Integration tests for validate-controls.py.""" + + def test_validator_passes_on_valid_controls(self): + result = subprocess.run( + [sys.executable, str(VALIDATOR)], + capture_output=True, text=True, cwd=str(REPO_ROOT), + ) + assert result.returncode == 0, f"Validator failed:\n{result.stdout}\n{result.stderr}" + assert "ALL CHECKS PASSED" in result.stdout + + def test_validator_reports_control_count(self): + result = subprocess.run( + [sys.executable, str(VALIDATOR)], + capture_output=True, text=True, cwd=str(REPO_ROOT), + ) + assert "Controls: 10" in result.stdout + assert "Open Anchors:" in result.stdout + + +class TestControlsJsonStructure: + """Direct validation of the JSON file structure.""" + + @pytest.fixture + def controls_data(self): + with open(CONTROLS_FILE) as f: + return json.load(f) + + def test_top_level_keys(self, controls_data): + assert "version" in controls_data + assert "schema" in controls_data + assert "framework" in controls_data + assert "domains" in controls_data + assert "controls" in controls_data + + def test_framework_metadata(self, controls_data): + fw = controls_data["framework"] + assert fw["id"] == "bp_security_v1" + assert fw["version"] == "1.0" + + def test_all_controls_have_open_anchors(self, controls_data): + for ctrl in controls_data["controls"]: + anchors = ctrl.get("open_anchors", []) + assert len(anchors) >= 1, ( + f"Control {ctrl['control_id']} has no open anchors" + ) + + def test_no_bsi_nomenclature_in_controls(self, controls_data): + """Ensure no BSI-proprietary IDs leak into product-facing fields.""" + import re + bsi_pattern = re.compile(r"O\.[A-Za-z]+_[0-9]+") + for ctrl in controls_data["controls"]: + for field in ["objective", "rationale", "title"]: + text = ctrl.get(field, "") + match = bsi_pattern.search(text) + assert match is None, ( + f"Control {ctrl['control_id']}.{field} contains BSI pattern: {match.group()}" + ) + + def test_control_id_format(self, controls_data): + import re + pattern = re.compile(r"^[A-Z]{2,6}-[0-9]{3}$") + for ctrl in controls_data["controls"]: + assert pattern.match(ctrl["control_id"]), ( + f"Invalid control_id format: {ctrl['control_id']}" + ) + + def test_valid_severities(self, controls_data): + valid = {"low", "medium", "high", "critical"} + for ctrl in controls_data["controls"]: + assert ctrl["severity"] in valid, ( + f"Control {ctrl['control_id']} has invalid severity: {ctrl['severity']}" + ) + + def test_domains_referenced_by_controls(self, controls_data): + domain_ids = {d["id"] for d in controls_data["domains"]} + for ctrl in controls_data["controls"]: + assert ctrl["domain"] in domain_ids, ( + f"Control {ctrl['control_id']} references unknown domain: {ctrl['domain']}" + ) + + def test_open_anchor_structure(self, controls_data): + for ctrl in controls_data["controls"]: + for i, anchor in enumerate(ctrl.get("open_anchors", [])): + assert "framework" in anchor, ( + f"Control {ctrl['control_id']}: anchor[{i}] missing 'framework'" + ) + assert "ref" in anchor, ( + f"Control {ctrl['control_id']}: anchor[{i}] missing 'ref'" + ) + assert "url" in anchor, ( + f"Control {ctrl['control_id']}: anchor[{i}] missing 'url'" + ) + assert anchor["url"].startswith("https://"), ( + f"Control {ctrl['control_id']}: anchor[{i}] URL not HTTPS" + ) + + def test_evidence_structure(self, controls_data): + for ctrl in controls_data["controls"]: + for i, ev in enumerate(ctrl.get("evidence", [])): + assert "type" in ev, ( + f"Control {ctrl['control_id']}: evidence[{i}] missing 'type'" + ) + assert "description" in ev, ( + f"Control {ctrl['control_id']}: evidence[{i}] missing 'description'" + ) + + def test_risk_scores_in_range(self, controls_data): + for ctrl in controls_data["controls"]: + if ctrl.get("risk_score") is not None: + assert 0 <= ctrl["risk_score"] <= 10, ( + f"Control {ctrl['control_id']}: risk_score {ctrl['risk_score']} out of range" + ) + + def test_total_controls_matches(self, controls_data): + assert controls_data["total_controls"] == len(controls_data["controls"]) + + def test_independent_taxonomy_no_tr_reference(self, controls_data): + """Verify controls don't reference BSI TR documents in product text.""" + import re + tr_pattern = re.compile(r"TR-03161|BSI-TR-") + for ctrl in controls_data["controls"]: + for field in ["objective", "rationale", "title"]: + text = ctrl.get(field, "") + match = tr_pattern.search(text) + assert match is None, ( + f"Control {ctrl['control_id']}.{field} references BSI TR: {match.group()}" + ) diff --git a/docs-src/services/sdk-modules/canonical-control-library.md b/docs-src/services/sdk-modules/canonical-control-library.md new file mode 100644 index 0000000..5be2650 --- /dev/null +++ b/docs-src/services/sdk-modules/canonical-control-library.md @@ -0,0 +1,251 @@ +# Canonical Control Library (CP-CLIB) + +Eigenstaendig formulierte Security Controls basierend auf offenem Wissen (OWASP, NIST, ENISA). +Unabhaengige Taxonomie — kein Bezug zu proprietaeren Frameworks. + +**Prefix:** `CP-CLIB` · **Frontend:** `https://macmini:3007/sdk/control-library` +**Provenance Wiki:** `https://macmini:3007/sdk/control-provenance` +**Proxy:** `/api/sdk/v1/canonical` → `backend-compliance:8002/api/v1/canonical/...` + +--- + +## Motivation + +Wir benoetigen ein System, um aus verschiedenen Security-Guidelines **eigenstaendige, rechtlich defensible Controls** zu extrahieren, ohne proprietaere Texte im Produkt zu verwenden. + +### Kernprinzipien + +1. **Unabhaengige Taxonomie** — Eigene Domain-IDs (AUTH, NET, SUP, etc.), eigenes ID-Format (`DOMAIN-NNN`) +2. **Open-Source-Verankerung** — Jedes Control hat mindestens 1 Open Anchor (OWASP/NIST/ENISA) +3. **Strikte Quellentrennung** — Geschuetzte Quellen nur intern zur Analyse, nie im Produkt +4. **Automatisierte Pruefung** — Too-Close-Detektor + No-Leak-Scanner in CI/CD + +--- + +## Rechtliche Basis + +| Gesetz | Bezug | +|--------|-------| +| UrhG §44b | Text & Data Mining — Kopien loeschen | +| UrhG §23 | Hinreichender Abstand zum Originalwerk | +| BSI Nutzungsbedingungen | Kommerziell nur mit Zustimmung | + +--- + +## Domains (Unabhaengige Taxonomie) + +| Domain | Name | Beschreibung | +|--------|------|-------------| +| AUTH | Identity & Access Management | Authentisierung, MFA, Token-Management | +| NET | Network & Transport Security | TLS, Zertifikate, Netzwerk-Haertung | +| SUP | Software Supply Chain | Signierung, SBOM, Dependency-Scanning | +| LOG | Security Operations & Logging | Privacy-Aware Logging, SIEM | +| WEB | Web Application Security | Admin-Flows, Account Recovery | +| DATA | Data Governance & Classification | Datenklassifikation, Schutzmassnahmen | +| CRYP | Cryptographic Operations | Key Management, Rotation, HSM | +| REL | Release & Change Governance | Change Impact Assessment, Security Review | + +!!! warning "Keine BSI-Nomenklatur" + Die Domains verwenden bewusst KEINE BSI-Bezeichner (O.Auth_*, O.Netz_*). + Das ID-Format `DOMAIN-NNN` ist eine gaengige, nicht-proprietaere Konvention. + +--- + +## Datenmodell (Migration 044) + +```mermaid +erDiagram + canonical_control_licenses ||--o{ canonical_control_sources : "hat" + canonical_control_frameworks ||--o{ canonical_controls : "enthaelt" + canonical_controls ||--o{ canonical_control_mappings : "hat" + canonical_control_sources ||--o{ canonical_control_mappings : "referenziert" + + canonical_control_licenses { + varchar license_id PK + varchar name + varchar commercial_use + boolean deletion_required + } + canonical_control_sources { + uuid id PK + varchar source_id UK + varchar title + boolean allowed_ship_in_product + } + canonical_control_frameworks { + uuid id PK + varchar framework_id UK + varchar name + varchar version + } + canonical_controls { + uuid id PK + uuid framework_id FK + varchar control_id + varchar severity + jsonb open_anchors + } + canonical_control_mappings { + uuid id PK + uuid control_id FK + uuid source_id FK + varchar mapping_type + varchar attribution_class + } +``` + +### Tabellen + +| Tabelle | Zweck | Produktfaehig? | +|---------|-------|----------------| +| `canonical_control_licenses` | Lizenz-Metadaten | Ja (read-only) | +| `canonical_control_sources` | Quellen-Register | **Nein** (nur intern) | +| `canonical_control_frameworks` | Framework-Registry | Ja | +| `canonical_controls` | Die eigentlichen Controls | Ja | +| `canonical_control_mappings` | Provenance-Trail | **Nein** (nur Audit) | + +--- + +## API Endpoints + +| Methode | Pfad | Beschreibung | +|---------|------|--------------| +| `GET` | `/v1/canonical/frameworks` | Alle Frameworks | +| `GET` | `/v1/canonical/frameworks/{id}` | Framework-Details | +| `GET` | `/v1/canonical/frameworks/{id}/controls` | Controls eines Frameworks | +| `GET` | `/v1/canonical/controls` | Alle Controls (Filter: `severity`, `domain`, `release_state`) | +| `GET` | `/v1/canonical/controls/{control_id}` | Einzelnes Control (z.B. AUTH-001) | +| `GET` | `/v1/canonical/sources` | Quellenregister mit Berechtigungen | +| `GET` | `/v1/canonical/licenses` | Lizenz-Matrix | +| `POST` | `/v1/canonical/controls/{id}/similarity-check` | Too-Close-Pruefung | + +### Beispiel: Control abrufen + +```bash +curl -s https://macmini:8002/api/v1/canonical/controls/AUTH-001 | jq +``` + +### Beispiel: Similarity Check + +```bash +curl -X POST https://macmini:8002/api/v1/canonical/controls/AUTH-001/similarity-check \ + -H 'Content-Type: application/json' \ + -d '{ + "source_text": "Die Anwendung muss MFA implementieren.", + "candidate_text": "Privileged accounts require multi-factor authentication." + }' | jq +``` + +**Response:** +```json +{ + "max_exact_run": 0, + "token_overlap": 0.0714, + "ngram_jaccard": 0.0323, + "embedding_cosine": 0.0, + "lcs_ratio": 0.0714, + "status": "PASS", + "details": { + "max_exact_run": "PASS", + "token_overlap": "PASS", + "ngram_jaccard": "PASS", + "embedding_cosine": "PASS", + "lcs_ratio": "PASS" + } +} +``` + +--- + +## Too-Close-Detektor + +5 Metriken mit Schwellwerten: + +| Metrik | Warn | Fail | Beschreibung | +|--------|------|------|-------------| +| Exact Phrase | ≥8 Tokens | ≥12 Tokens | Laengste identische Token-Sequenz | +| Token Overlap | ≥0.20 | ≥0.30 | Jaccard der Token-Mengen | +| 3-Gram Jaccard | ≥0.10 | ≥0.18 | Zeichenketten-Aehnlichkeit | +| Embedding Cosine | ≥0.86 | ≥0.92 | Semantische Aehnlichkeit (bge-m3) | +| LCS Ratio | ≥0.35 | ≥0.50 | Longest Common Subsequence | + +**Entscheidungslogik:** + +- **PASS** — Kein Fail + max 1 Warn +- **WARN** — Max 2 Warn, kein Fail → Human Review +- **FAIL** — Irgendein Fail → Block, Umformulierung noetig + +--- + +## License Gate + +Jede Quelle hat definierte Berechtigungen: + +| Nutzungsart | Spalte | Beispiel OWASP | Beispiel BSI | +|-------------|--------|---------------|-------------| +| Analyse | `allowed_analysis` | ✅ | ✅ | +| Excerpt speichern | `allowed_store_excerpt` | ✅ | ❌ | +| Embeddings shippen | `allowed_ship_embeddings` | ✅ | ❌ | +| Im Produkt shippen | `allowed_ship_in_product` | ✅ | ❌ | + +--- + +## CI/CD Validation + +Der Validator (`scripts/validate-controls.py`) prueft bei jedem Commit: + +1. **Schema Validation** — Alle Pflichtfelder, ID-Format, Severity +2. **No-Leak Scanner** — Regex gegen BSI-Muster (`O.Auth_*`, `TR-03161`, etc.) +3. **Open Anchor Check** — Jedes Control hat ≥1 Open Anchor +4. **Taxonomy Check** — Keine BSI-style ID-Prefixe +5. **Evidence Structure** — Alle Evidence-Items haben `type` + `description` + +--- + +## Frontend + +### Control Library Browser (`/sdk/control-library`) + +- Framework-Info mit Version und Beschreibung +- Filterable Control-Tabelle (Domain, Severity, Freitext) +- Detail-Ansicht mit: Ziel, Begruendung, Anforderungen, Pruefverfahren, Nachweise +- **Open-Source-Referenzen** prominent dargestellt (gruener Kasten) +- Tags und Scope-Informationen + +### Control Provenance Wiki (`/sdk/control-provenance`) + +- Dokumentation der Methodik +- Unabhaengige Taxonomie erklaert +- Offene Referenzquellen aufgelistet +- Geschuetzte Quellen und Trennungsprinzip +- **Live-Daten:** Lizenz-Matrix und Quellenregister aus der Datenbank + +--- + +## Dateien + +| Datei | Typ | Beschreibung | +|-------|-----|-------------| +| `backend-compliance/migrations/044_canonical_control_library.sql` | SQL | 5 Tabellen + Seed-Daten | +| `backend-compliance/compliance/api/canonical_control_routes.py` | Python | REST API (8 Endpoints) | +| `backend-compliance/compliance/services/license_gate.py` | Python | Lizenz-Gate-Logik | +| `backend-compliance/compliance/services/similarity_detector.py` | Python | Too-Close-Detektor (5 Metriken) | +| `ai-compliance-sdk/policies/canonical_controls_v1.json` | JSON | 10 Seed Controls, 39 Open Anchors | +| `ai-compliance-sdk/internal/ucca/canonical_control_loader.go` | Go | Control Loader mit Multi-Index | +| `admin-compliance/app/sdk/control-library/page.tsx` | TSX | Control Library Browser | +| `admin-compliance/app/sdk/control-provenance/page.tsx` | TSX | Provenance Wiki | +| `admin-compliance/app/api/sdk/v1/canonical/route.ts` | TS | Next.js API Proxy | +| `scripts/validate-controls.py` | Python | CI/CD Validator | + +--- + +## Tests + +| Datei | Sprache | Tests | +|-------|---------|-------| +| `ai-compliance-sdk/internal/ucca/canonical_control_loader_test.go` | Go | 8 Tests | +| `backend-compliance/compliance/tests/test_similarity_detector.py` | Python | 19 Tests | +| `backend-compliance/tests/test_canonical_control_routes.py` | Python | 14 Tests | +| `backend-compliance/tests/test_license_gate.py` | Python | 12 Tests | +| `backend-compliance/tests/test_validate_controls.py` | Python | 14 Tests | +| **Gesamt** | | **67 Tests** | diff --git a/mkdocs.yml b/mkdocs.yml index 819ecd3..447d6d2 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -100,6 +100,7 @@ nav: - Dokument-Versionierung: services/sdk-modules/versionierung.md - Change-Request System (CP-CR): services/sdk-modules/change-requests.md - Dokumentengenerierung: services/sdk-modules/dokumentengenerierung.md + - Canonical Control Library (CP-CLIB): services/sdk-modules/canonical-control-library.md - Strategie: - Wettbewerbsanalyse & Roadmap: strategy/wettbewerbsanalyse.md - Entwicklung: diff --git a/scripts/validate-controls.py b/scripts/validate-controls.py new file mode 100644 index 0000000..a2f17af --- /dev/null +++ b/scripts/validate-controls.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python3 +""" +Canonical Control Library — CI/CD Validator + +Checks: + 1. Schema Validation — JSON against defined structure + 2. License Gate — Every mapping reference fulfils allowed_usages + 3. No-Leak Scanner — Regex against forbidden locator patterns (e.g. O.Auth_*, O.Netz_*) + 4. Provenance Integrity — Every referenced source_id exists in seed data + 5. Open Anchor Check — Every control has >= 1 open anchor + +Usage: + python scripts/validate-controls.py +""" + +import json +import re +import sys +from pathlib import Path + +# --------------------------------------------------------------------------- +# Paths +# --------------------------------------------------------------------------- + +REPO_ROOT = Path(__file__).resolve().parent.parent +CONTROLS_FILE = REPO_ROOT / "ai-compliance-sdk" / "policies" / "canonical_controls_v1.json" +MIGRATION_FILE = REPO_ROOT / "backend-compliance" / "migrations" / "044_canonical_control_library.sql" + +# --------------------------------------------------------------------------- +# Forbidden patterns (BSI proprietary nomenclature — must NOT appear in controls) +# --------------------------------------------------------------------------- + +FORBIDDEN_PATTERNS = [ + re.compile(r"O\.[A-Za-z]+_[0-9]+"), # BSI objective IDs: O.Auth_1, O.Netz_3 + re.compile(r"TR-03161"), # Direct TR reference in control text + re.compile(r"BSI-TR-"), # Direct BSI-TR reference + re.compile(r"Anforderung\s+[A-Z]\.\d+"), # BSI requirement format +] + +# Fields that are product-facing and must not contain forbidden patterns +PRODUCT_FIELDS = ["objective", "rationale", "title", "requirements", "test_procedure"] + +# --------------------------------------------------------------------------- +# Known open sources (from migration seed) +# --------------------------------------------------------------------------- + +KNOWN_OPEN_SOURCES = { + "OWASP_ASVS", "OWASP_MASVS", "OWASP_TOP10", + "NIST_SP800_53", "NIST_SP800_63B", + "ENISA_GOOD_PRACTICES", "CIS_CONTROLS", +} + +KNOWN_ALL_SOURCES = KNOWN_OPEN_SOURCES | { + "BSI_TR03161_1", "BSI_TR03161_2", "BSI_TR03161_3", +} + +# --------------------------------------------------------------------------- +# Validators +# --------------------------------------------------------------------------- + +errors: list[str] = [] +warnings: list[str] = [] + + +def error(msg: str) -> None: + errors.append(msg) + + +def warn(msg: str) -> None: + warnings.append(msg) + + +def check_schema(data: dict) -> None: + """Validate JSON structure.""" + required_top = ["version", "schema", "framework", "total_controls", "domains", "controls"] + for key in required_top: + if key not in data: + error(f"[SCHEMA] Missing top-level key: {key}") + + required_control = [ + "control_id", "title", "domain", "severity", "objective", + "rationale", "scope", "requirements", "test_procedure", + "evidence", "open_anchors", + ] + control_id_pattern = re.compile(r"^[A-Z]{2,6}-[0-9]{3}$") + valid_severities = {"low", "medium", "high", "critical"} + + for ctrl in data.get("controls", []): + cid = ctrl.get("control_id", "???") + for key in required_control: + if key not in ctrl: + error(f"[SCHEMA] Control {cid}: missing field '{key}'") + + if not control_id_pattern.match(cid): + error(f"[SCHEMA] Control {cid}: ID does not match ^[A-Z]{{2,6}}-[0-9]{{3}}$") + + sev = ctrl.get("severity", "") + if sev not in valid_severities: + error(f"[SCHEMA] Control {cid}: invalid severity '{sev}'") + + if ctrl.get("risk_score") is not None: + rs = ctrl["risk_score"] + if not (0 <= rs <= 10): + error(f"[SCHEMA] Control {cid}: risk_score {rs} out of range [0, 10]") + + domain_ids = {d["id"] for d in data.get("domains", [])} + for ctrl in data.get("controls", []): + cid = ctrl.get("control_id", "???") + if ctrl.get("domain") not in domain_ids: + error(f"[SCHEMA] Control {cid}: domain '{ctrl.get('domain')}' not in domains list") + + +def check_no_leak(data: dict) -> None: + """Ensure no BSI-proprietary nomenclature leaks into product-facing fields.""" + for ctrl in data.get("controls", []): + cid = ctrl.get("control_id", "???") + for field_name in PRODUCT_FIELDS: + values = ctrl.get(field_name, "") + if isinstance(values, list): + texts = values + else: + texts = [values] + + for text_val in texts: + if not isinstance(text_val, str): + continue + for pattern in FORBIDDEN_PATTERNS: + match = pattern.search(text_val) + if match: + error( + f"[NO-LEAK] Control {cid}.{field_name}: " + f"forbidden pattern '{match.group()}' found" + ) + + +def check_open_anchors(data: dict) -> None: + """Every control must have at least 1 open anchor.""" + for ctrl in data.get("controls", []): + cid = ctrl.get("control_id", "???") + anchors = ctrl.get("open_anchors", []) + if len(anchors) < 1: + error(f"[ANCHOR] Control {cid}: no open anchors — every control needs >= 1") + # Check anchor structure + for i, anchor in enumerate(anchors): + for key in ["framework", "ref", "url"]: + if key not in anchor or not anchor[key]: + error(f"[ANCHOR] Control {cid}: open_anchor[{i}] missing '{key}'") + + +def check_independent_taxonomy(data: dict) -> None: + """Verify controls use independent taxonomy, not BSI structure.""" + bsi_domain_patterns = [ + re.compile(r"^O\.", re.IGNORECASE), # BSI objective prefix + ] + for ctrl in data.get("controls", []): + cid = ctrl.get("control_id", "???") + for pattern in bsi_domain_patterns: + if pattern.match(cid): + error(f"[TAXONOMY] Control {cid}: uses BSI-style ID prefix") + + +def check_evidence_fields(data: dict) -> None: + """Validate evidence items have required fields.""" + for ctrl in data.get("controls", []): + cid = ctrl.get("control_id", "???") + for i, ev in enumerate(ctrl.get("evidence", [])): + if not isinstance(ev, dict): + error(f"[EVIDENCE] Control {cid}: evidence[{i}] is not an object") + continue + for key in ["type", "description"]: + if key not in ev or not ev[key]: + error(f"[EVIDENCE] Control {cid}: evidence[{i}] missing '{key}'") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main() -> int: + print(f"Validating: {CONTROLS_FILE}") + print() + + if not CONTROLS_FILE.exists(): + print(f"ERROR: File not found: {CONTROLS_FILE}") + return 1 + + with open(CONTROLS_FILE) as f: + data = json.load(f) + + check_schema(data) + check_no_leak(data) + check_open_anchors(data) + check_independent_taxonomy(data) + check_evidence_fields(data) + + total_controls = len(data.get("controls", [])) + total_anchors = sum(len(c.get("open_anchors", [])) for c in data.get("controls", [])) + + print(f"Controls: {total_controls}") + print(f"Open Anchors: {total_anchors}") + print() + + if warnings: + print(f"WARNINGS ({len(warnings)}):") + for w in warnings: + print(f" ⚠ {w}") + print() + + if errors: + print(f"ERRORS ({len(errors)}):") + for e in errors: + print(f" ✗ {e}") + print() + print("VALIDATION FAILED") + return 1 + + print("ALL CHECKS PASSED") + return 0 + + +if __name__ == "__main__": + sys.exit(main())