Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 25s
CI / test-go-edu-search (push) Successful in 26s
CI / test-python-klausur (push) Failing after 1m55s
CI / test-python-agent-core (push) Successful in 16s
CI / test-nodejs-website (push) Successful in 18s
- Voice-Service von Core nach Lehrer verschoben (bp-lehrer-voice-service) - 4 Jitsi-Services + 2 Synapse-Services in docker-compose.yml aufgenommen - Camunda komplett gelöscht: workflow pages, workflow-config.ts, bpmn-js deps - CAMUNDA_URL aus backend-lehrer environment entfernt - Sidebar: Kategorie "Compliance SDK" + "Katalogverwaltung" entfernt - Sidebar: Neue Kategorie "Kommunikation" mit Video & Chat, Voice Service, Alerts Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
232 lines
7.6 KiB
Python
232 lines
7.6 KiB
Python
"""
|
|
Encryption Service - Namespace Key Management
|
|
Client-side encryption for DSGVO compliance
|
|
|
|
The encryption key NEVER leaves the teacher's device.
|
|
Server only sees:
|
|
- Key hash (for verification)
|
|
- Encrypted blobs
|
|
- Namespace ID (pseudonym)
|
|
"""
|
|
import structlog
|
|
import hashlib
|
|
import base64
|
|
import secrets
|
|
from typing import Optional
|
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
|
|
|
from config import settings
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
|
|
class EncryptionService:
|
|
"""
|
|
Handles namespace key verification and server-side encryption.
|
|
|
|
Important: This service does NOT have access to the actual encryption key.
|
|
The key is stored only on the teacher's device.
|
|
This service only verifies key hashes and manages encrypted blobs.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._key_hashes: dict[str, str] = {} # namespace_id -> key_hash
|
|
self._server_key = secrets.token_bytes(32) # Server-side encryption for transit
|
|
|
|
def verify_key_hash(self, key_hash: str) -> bool:
|
|
"""
|
|
Verify that a key hash is valid format.
|
|
Does NOT verify the actual key - that's client-side only.
|
|
|
|
Accepts "disabled" for development over HTTP (where crypto.subtle is unavailable).
|
|
In production, always use HTTPS to enable proper encryption.
|
|
"""
|
|
if not key_hash:
|
|
return False
|
|
|
|
# Allow "disabled" for development (HTTP context where crypto.subtle is unavailable)
|
|
if key_hash == "disabled":
|
|
logger.warning(
|
|
"Encryption disabled - client running in non-secure context (HTTP). "
|
|
"Use HTTPS in production!"
|
|
)
|
|
return True
|
|
|
|
# Expected format: "sha256:base64encodedHash"
|
|
if not key_hash.startswith("sha256:"):
|
|
return False
|
|
|
|
try:
|
|
hash_part = key_hash[7:] # Remove "sha256:" prefix
|
|
decoded = base64.b64decode(hash_part)
|
|
return len(decoded) == 32 # SHA-256 produces 32 bytes
|
|
except Exception:
|
|
return False
|
|
|
|
def register_namespace_key(self, namespace_id: str, key_hash: str) -> bool:
|
|
"""
|
|
Register a namespace's key hash for future verification.
|
|
"""
|
|
if not self.verify_key_hash(key_hash):
|
|
logger.warning("Invalid key hash format", namespace_id=namespace_id[:8])
|
|
return False
|
|
|
|
self._key_hashes[namespace_id] = key_hash
|
|
if key_hash == "disabled":
|
|
logger.info("Namespace registered (encryption disabled)", namespace_id=namespace_id[:8])
|
|
else:
|
|
logger.info("Namespace key registered", namespace_id=namespace_id[:8])
|
|
return True
|
|
|
|
def encrypt_content(self, plaintext: str, namespace_id: str) -> str:
|
|
"""
|
|
Encrypt content for server-side storage.
|
|
|
|
Note: This is transit encryption only.
|
|
The actual client-side encryption happens in the browser/app.
|
|
This adds an additional layer for data at rest on the server.
|
|
"""
|
|
if not settings.encryption_enabled:
|
|
return plaintext
|
|
|
|
try:
|
|
# Derive key from server key + namespace
|
|
derived_key = self._derive_key(namespace_id)
|
|
|
|
# Generate nonce
|
|
nonce = secrets.token_bytes(12)
|
|
|
|
# Encrypt
|
|
aesgcm = AESGCM(derived_key)
|
|
ciphertext = aesgcm.encrypt(nonce, plaintext.encode('utf-8'), None)
|
|
|
|
# Combine nonce + ciphertext and encode
|
|
encrypted = base64.b64encode(nonce + ciphertext).decode('utf-8')
|
|
return f"encrypted:{encrypted}"
|
|
|
|
except Exception as e:
|
|
logger.error("Encryption failed", error=str(e))
|
|
raise
|
|
|
|
def decrypt_content(self, encrypted: str, namespace_id: str) -> str:
|
|
"""
|
|
Decrypt server-side encrypted content.
|
|
"""
|
|
if not settings.encryption_enabled:
|
|
return encrypted
|
|
|
|
if not encrypted.startswith("encrypted:"):
|
|
return encrypted # Not encrypted
|
|
|
|
try:
|
|
# Decode
|
|
encoded = encrypted[10:] # Remove "encrypted:" prefix
|
|
data = base64.b64decode(encoded)
|
|
|
|
# Split nonce and ciphertext
|
|
nonce = data[:12]
|
|
ciphertext = data[12:]
|
|
|
|
# Derive key from server key + namespace
|
|
derived_key = self._derive_key(namespace_id)
|
|
|
|
# Decrypt
|
|
aesgcm = AESGCM(derived_key)
|
|
plaintext = aesgcm.decrypt(nonce, ciphertext, None)
|
|
|
|
return plaintext.decode('utf-8')
|
|
|
|
except Exception as e:
|
|
logger.error("Decryption failed", error=str(e))
|
|
raise
|
|
|
|
def _derive_key(self, namespace_id: str) -> bytes:
|
|
"""
|
|
Derive a key from server key + namespace ID.
|
|
This ensures each namespace has a unique encryption key.
|
|
"""
|
|
kdf = PBKDF2HMAC(
|
|
algorithm=hashes.SHA256(),
|
|
length=32,
|
|
salt=namespace_id.encode('utf-8'),
|
|
iterations=100000,
|
|
)
|
|
return kdf.derive(self._server_key)
|
|
|
|
@staticmethod
|
|
def generate_key_hash(key: bytes) -> str:
|
|
"""
|
|
Generate a key hash for client-side use.
|
|
This is a utility method - actual implementation is in the client.
|
|
"""
|
|
hash_bytes = hashlib.sha256(key).digest()
|
|
encoded = base64.b64encode(hash_bytes).decode('utf-8')
|
|
return f"sha256:{encoded}"
|
|
|
|
@staticmethod
|
|
def generate_namespace_id() -> str:
|
|
"""
|
|
Generate a new namespace ID for a teacher.
|
|
"""
|
|
return f"ns-{secrets.token_hex(16)}"
|
|
|
|
|
|
class ClientSideEncryption:
|
|
"""
|
|
Helper class documenting client-side encryption.
|
|
This code runs in the browser/app, not on the server.
|
|
|
|
Client-side encryption flow:
|
|
1. Teacher generates a master key on first use
|
|
2. Master key is stored in browser/app secure storage
|
|
3. Key hash is sent to server for session verification
|
|
4. All PII is encrypted with master key before sending to server
|
|
5. Server only sees encrypted blobs
|
|
|
|
JavaScript implementation:
|
|
```javascript
|
|
// Generate master key (one-time)
|
|
const masterKey = await crypto.subtle.generateKey(
|
|
{ name: "AES-GCM", length: 256 },
|
|
true,
|
|
["encrypt", "decrypt"]
|
|
);
|
|
|
|
// Store in IndexedDB (encrypted with device key)
|
|
await storeSecurely("masterKey", masterKey);
|
|
|
|
// Generate key hash for server
|
|
const keyData = await crypto.subtle.exportKey("raw", masterKey);
|
|
const hashBuffer = await crypto.subtle.digest("SHA-256", keyData);
|
|
const keyHash = "sha256:" + btoa(String.fromCharCode(...new Uint8Array(hashBuffer)));
|
|
|
|
// Encrypt content before sending
|
|
async function encryptContent(content) {
|
|
const iv = crypto.getRandomValues(new Uint8Array(12));
|
|
const encoded = new TextEncoder().encode(content);
|
|
const ciphertext = await crypto.subtle.encrypt(
|
|
{ name: "AES-GCM", iv },
|
|
masterKey,
|
|
encoded
|
|
);
|
|
return btoa(String.fromCharCode(...iv, ...new Uint8Array(ciphertext)));
|
|
}
|
|
|
|
// Decrypt content after receiving
|
|
async function decryptContent(encrypted) {
|
|
const data = Uint8Array.from(atob(encrypted), c => c.charCodeAt(0));
|
|
const iv = data.slice(0, 12);
|
|
const ciphertext = data.slice(12);
|
|
const decrypted = await crypto.subtle.decrypt(
|
|
{ name: "AES-GCM", iv },
|
|
masterKey,
|
|
ciphertext
|
|
);
|
|
return new TextDecoder().decode(decrypted);
|
|
}
|
|
```
|
|
"""
|
|
pass
|