Restructure: Move 52 files into 7 domain packages
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 28s
CI / test-go-edu-search (push) Successful in 28s
CI / test-python-klausur (push) Failing after 2m22s
CI / test-python-agent-core (push) Successful in 21s
CI / test-nodejs-website (push) Successful in 23s

korrektur/ zeugnis/ admin/ compliance/ worksheet/ training/ metrics/
52 shims, relative imports, RAG untouched.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-04-25 22:10:48 +02:00
parent 0504d22b8e
commit 165c493d1e
111 changed files with 11859 additions and 11609 deletions

View File

@@ -0,0 +1,6 @@
"""
compliance package — compliance pipeline, RBAC/ABAC policy engine.
Backward-compatible re-exports: consumers can still use
``from compliance_models import ...`` etc. via the shim files in backend/.
"""

View File

@@ -0,0 +1,200 @@
"""
Compliance Extraction & Generation.
Functions for extracting checkpoints from legal text chunks,
generating controls, and creating remediation measures.
"""
import re
import hashlib
import logging
from typing import Dict, List, Optional
from .models import Checkpoint, Control, Measure
logger = logging.getLogger(__name__)
def extract_checkpoints_from_chunk(chunk_text: str, payload: Dict) -> List[Checkpoint]:
"""
Extract checkpoints/requirements from a chunk of text.
Uses pattern matching to find requirement-like statements.
"""
checkpoints = []
regulation_code = payload.get("regulation_code", "UNKNOWN")
regulation_name = payload.get("regulation_name", "Unknown")
source_url = payload.get("source_url", "")
chunk_id = hashlib.md5(chunk_text[:100].encode()).hexdigest()[:8]
# Patterns for different requirement types
patterns = [
# BSI-TR patterns
(r'([OT]\.[A-Za-z_]+\d*)[:\s]+(.+?)(?=\n[OT]\.|$)', 'bsi_requirement'),
# Article patterns (GDPR, AI Act, etc.)
(r'(?:Artikel|Art\.?)\s+(\d+)(?:\s+Abs(?:atz)?\.?\s*(\d+))?\s*[-\u2013:]\s*(.+?)(?=\n|$)', 'article'),
# Numbered requirements
(r'\((\d+)\)\s+(.+?)(?=\n\(\d+\)|$)', 'numbered'),
# "Der Verantwortliche muss" patterns
(r'(?:Der Verantwortliche|Die Aufsichtsbeh\u00f6rde|Der Auftragsverarbeiter)\s+(muss|hat|soll)\s+(.+?)(?=\.\s|$)', 'obligation'),
# "Es ist erforderlich" patterns
(r'(?:Es ist erforderlich|Es muss gew\u00e4hrleistet|Es sind geeignete)\s+(.+?)(?=\.\s|$)', 'requirement'),
]
for pattern, pattern_type in patterns:
matches = re.finditer(pattern, chunk_text, re.MULTILINE | re.DOTALL)
for match in matches:
if pattern_type == 'bsi_requirement':
req_id = match.group(1)
description = match.group(2).strip()
title = req_id
elif pattern_type == 'article':
article_num = match.group(1)
paragraph = match.group(2) or ""
title_text = match.group(3).strip()
req_id = f"{regulation_code}-Art{article_num}"
if paragraph:
req_id += f"-{paragraph}"
title = f"Art. {article_num}" + (f" Abs. {paragraph}" if paragraph else "")
description = title_text
elif pattern_type == 'numbered':
num = match.group(1)
description = match.group(2).strip()
req_id = f"{regulation_code}-{num}"
title = f"Anforderung {num}"
else:
# Generic requirement
description = match.group(0).strip()
req_id = f"{regulation_code}-{chunk_id}-{len(checkpoints)}"
title = description[:50] + "..." if len(description) > 50 else description
# Skip very short matches
if len(description) < 20:
continue
checkpoint = Checkpoint(
id=req_id,
regulation_code=regulation_code,
regulation_name=regulation_name,
article=title if 'Art' in title else None,
title=title,
description=description[:500],
original_text=description,
chunk_id=chunk_id,
source_url=source_url
)
checkpoints.append(checkpoint)
return checkpoints
def generate_control_for_checkpoints(
checkpoints: List[Checkpoint],
domain_counts: Dict[str, int],
) -> Optional[Control]:
"""
Generate a control that covers the given checkpoints.
This is a simplified version - in production this would use the AI assistant.
"""
if not checkpoints:
return None
# Group by regulation
regulation = checkpoints[0].regulation_code
# Determine domain based on content
all_text = " ".join([cp.description for cp in checkpoints]).lower()
domain = "gov" # Default
if any(kw in all_text for kw in ["verschl\u00fcssel", "krypto", "encrypt", "hash"]):
domain = "crypto"
elif any(kw in all_text for kw in ["zugang", "access", "authentif", "login", "benutzer"]):
domain = "iam"
elif any(kw in all_text for kw in ["datenschutz", "personenbezogen", "privacy", "einwilligung"]):
domain = "priv"
elif any(kw in all_text for kw in ["entwicklung", "test", "code", "software"]):
domain = "sdlc"
elif any(kw in all_text for kw in ["\u00fcberwach", "monitor", "log", "audit"]):
domain = "aud"
elif any(kw in all_text for kw in ["ki", "k\u00fcnstlich", "ai", "machine learning", "model"]):
domain = "ai"
elif any(kw in all_text for kw in ["betrieb", "operation", "verf\u00fcgbar", "backup"]):
domain = "ops"
elif any(kw in all_text for kw in ["cyber", "resilience", "sbom", "vulnerab"]):
domain = "cra"
# Generate control ID
domain_count = domain_counts.get(domain, 0) + 1
control_id = f"{domain.upper()}-{domain_count:03d}"
# Create title from first checkpoint
title = checkpoints[0].title
if len(title) > 100:
title = title[:97] + "..."
# Create description
description = f"Control f\u00fcr {regulation}: " + checkpoints[0].description[:200]
# Pass criteria
pass_criteria = f"Alle {len(checkpoints)} zugeh\u00f6rigen Anforderungen sind erf\u00fcllt und dokumentiert."
# Implementation guidance
guidance = f"Implementiere Ma\u00dfnahmen zur Erf\u00fcllung der Anforderungen aus {regulation}. "
guidance += f"Dokumentiere die Umsetzung und f\u00fchre regelm\u00e4\u00dfige Reviews durch."
# Determine if automated
is_automated = any(kw in all_text for kw in ["automat", "tool", "scan", "test"])
control = Control(
id=control_id,
domain=domain,
title=title,
description=description,
checkpoints=[cp.id for cp in checkpoints],
pass_criteria=pass_criteria,
implementation_guidance=guidance,
is_automated=is_automated,
automation_tool="CI/CD Pipeline" if is_automated else None,
priority="high" if "muss" in all_text or "erforderlich" in all_text else "medium"
)
return control
def generate_measure_for_control(control: Control) -> Measure:
"""Generate a remediation measure for a control."""
measure_id = f"M-{control.id}"
# Determine deadline based on priority
deadline_days = {
"critical": 30,
"high": 60,
"medium": 90,
"low": 180
}.get(control.priority, 90)
# Determine responsible team
responsible = {
"priv": "Datenschutzbeauftragter",
"iam": "IT-Security Team",
"sdlc": "Entwicklungsteam",
"crypto": "IT-Security Team",
"ops": "Operations Team",
"aud": "Compliance Team",
"ai": "AI/ML Team",
"cra": "IT-Security Team",
"gov": "Management"
}.get(control.domain, "Compliance Team")
measure = Measure(
id=measure_id,
control_id=control.id,
title=f"Umsetzung: {control.title[:50]}",
description=f"Implementierung und Dokumentation von {control.id}: {control.description[:100]}",
responsible=responsible,
deadline_days=deadline_days,
status="pending"
)
return measure

View File

@@ -0,0 +1,65 @@
#!/usr/bin/env python3
"""
Full Compliance Pipeline for Legal Corpus — Barrel Re-export.
Split into submodules:
- compliance_models.py — Dataclasses (Checkpoint, Control, Measure)
- compliance_extraction.py — Pattern extraction & control/measure generation
- compliance_pipeline.py — Pipeline phases & orchestrator
Run on Mac Mini:
nohup python full_compliance_pipeline.py > /tmp/compliance_pipeline.log 2>&1 &
"""
import asyncio
import logging
import sys
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('/tmp/compliance_pipeline.log')
]
)
# Re-export all public symbols
from .models import Checkpoint, Control, Measure
from .extraction import (
extract_checkpoints_from_chunk,
generate_control_for_checkpoints,
generate_measure_for_control,
)
from .pipeline import CompliancePipeline
__all__ = [
"Checkpoint",
"Control",
"Measure",
"extract_checkpoints_from_chunk",
"generate_control_for_checkpoints",
"generate_measure_for_control",
"CompliancePipeline",
]
async def main():
import argparse
parser = argparse.ArgumentParser(description="Run the compliance pipeline")
parser.add_argument("--force-reindex", action="store_true",
help="Force re-ingestion of all documents")
parser.add_argument("--skip-ingestion", action="store_true",
help="Skip ingestion phase, use existing chunks")
args = parser.parse_args()
pipeline = CompliancePipeline()
await pipeline.run_full_pipeline(
force_reindex=args.force_reindex,
skip_ingestion=args.skip_ingestion
)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,49 @@
"""
Compliance Pipeline Data Models.
Dataclasses for checkpoints, controls, and measures.
"""
from typing import Optional, List
from dataclasses import dataclass
@dataclass
class Checkpoint:
"""A requirement/checkpoint extracted from legal text."""
id: str
regulation_code: str
regulation_name: str
article: Optional[str]
title: str
description: str
original_text: str
chunk_id: str
source_url: str
@dataclass
class Control:
"""A control derived from checkpoints."""
id: str
domain: str
title: str
description: str
checkpoints: List[str] # List of checkpoint IDs
pass_criteria: str
implementation_guidance: str
is_automated: bool
automation_tool: Optional[str]
priority: str
@dataclass
class Measure:
"""A remediation measure for a control."""
id: str
control_id: str
title: str
description: str
responsible: str
deadline_days: int
status: str

View File

@@ -0,0 +1,441 @@
"""
Compliance Pipeline Execution.
Pipeline phases (ingestion, extraction, control generation, measures)
and orchestration logic.
"""
import asyncio
import json
import logging
import os
import sys
import time
from datetime import datetime
from typing import Dict, List, Any
from dataclasses import asdict
from .models import Checkpoint, Control, Measure
from .extraction import (
extract_checkpoints_from_chunk,
generate_control_for_checkpoints,
generate_measure_for_control,
)
logger = logging.getLogger(__name__)
# Import checkpoint manager
try:
from pipeline_checkpoints import CheckpointManager, EXPECTED_VALUES, ValidationStatus
except ImportError:
logger.warning("Checkpoint manager not available, running without checkpoints")
CheckpointManager = None
EXPECTED_VALUES = {}
ValidationStatus = None
# Set environment variables for Docker network
if not os.getenv("QDRANT_URL") and not os.getenv("QDRANT_HOST"):
os.environ["QDRANT_HOST"] = "qdrant"
os.environ.setdefault("EMBEDDING_SERVICE_URL", "http://embedding-service:8087")
# Try to import from klausur-service
try:
from legal_corpus_ingestion import LegalCorpusIngestion, REGULATIONS, LEGAL_CORPUS_COLLECTION
from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue
except ImportError:
logger.error("Could not import required modules. Make sure you're in the klausur-service container.")
sys.exit(1)
class CompliancePipeline:
"""Handles the full compliance pipeline."""
def __init__(self):
# Support both QDRANT_URL and QDRANT_HOST/PORT
qdrant_url = os.getenv("QDRANT_URL", "")
if qdrant_url:
from urllib.parse import urlparse
parsed = urlparse(qdrant_url)
qdrant_host = parsed.hostname or "qdrant"
qdrant_port = parsed.port or 6333
else:
qdrant_host = os.getenv("QDRANT_HOST", "qdrant")
qdrant_port = 6333
self.qdrant = QdrantClient(host=qdrant_host, port=qdrant_port)
self.checkpoints: List[Checkpoint] = []
self.controls: List[Control] = []
self.measures: List[Measure] = []
self.stats = {
"chunks_processed": 0,
"checkpoints_extracted": 0,
"controls_created": 0,
"measures_defined": 0,
"by_regulation": {},
"by_domain": {},
}
# Initialize checkpoint manager
self.checkpoint_mgr = CheckpointManager() if CheckpointManager else None
async def run_ingestion_phase(self, force_reindex: bool = False) -> int:
"""Phase 1: Ingest documents (incremental - only missing ones)."""
logger.info("\n" + "=" * 60)
logger.info("PHASE 1: DOCUMENT INGESTION (INCREMENTAL)")
logger.info("=" * 60)
if self.checkpoint_mgr:
self.checkpoint_mgr.start_checkpoint("ingestion", "Document Ingestion")
ingestion = LegalCorpusIngestion()
try:
# Check existing chunks per regulation
existing_chunks = {}
try:
for regulation in REGULATIONS:
count_result = self.qdrant.count(
collection_name=LEGAL_CORPUS_COLLECTION,
count_filter=Filter(
must=[FieldCondition(key="regulation_code", match=MatchValue(value=regulation.code))]
)
)
existing_chunks[regulation.code] = count_result.count
logger.info(f" {regulation.code}: {count_result.count} existing chunks")
except Exception as e:
logger.warning(f"Could not check existing chunks: {e}")
# Determine which regulations need ingestion
regulations_to_ingest = []
for regulation in REGULATIONS:
existing = existing_chunks.get(regulation.code, 0)
if force_reindex or existing == 0:
regulations_to_ingest.append(regulation)
logger.info(f" -> Will ingest: {regulation.code} (existing: {existing}, force: {force_reindex})")
else:
logger.info(f" -> Skipping: {regulation.code} (already has {existing} chunks)")
self.stats["by_regulation"][regulation.code] = existing
if not regulations_to_ingest:
logger.info("All regulations already indexed. Skipping ingestion phase.")
total_chunks = sum(existing_chunks.values())
self.stats["chunks_processed"] = total_chunks
if self.checkpoint_mgr:
self.checkpoint_mgr.add_metric("total_chunks", total_chunks)
self.checkpoint_mgr.add_metric("skipped", True)
self.checkpoint_mgr.complete_checkpoint(success=True)
return total_chunks
# Ingest only missing regulations
total_chunks = sum(existing_chunks.values())
for i, regulation in enumerate(regulations_to_ingest, 1):
logger.info(f"[{i}/{len(regulations_to_ingest)}] Ingesting {regulation.code}...")
try:
count = await ingestion.ingest_regulation(regulation)
total_chunks += count
self.stats["by_regulation"][regulation.code] = count
logger.info(f" -> {count} chunks")
if self.checkpoint_mgr:
self.checkpoint_mgr.add_metric(f"chunks_{regulation.code}", count)
except Exception as e:
logger.error(f" -> FAILED: {e}")
self.stats["by_regulation"][regulation.code] = 0
self.stats["chunks_processed"] = total_chunks
logger.info(f"\nTotal chunks in collection: {total_chunks}")
# Validate ingestion results
if self.checkpoint_mgr:
self.checkpoint_mgr.add_metric("total_chunks", total_chunks)
self.checkpoint_mgr.add_metric("regulations_count", len(REGULATIONS))
expected = EXPECTED_VALUES.get("ingestion", {})
self.checkpoint_mgr.validate(
"total_chunks",
expected=expected.get("total_chunks", 8000),
actual=total_chunks,
min_value=expected.get("min_chunks", 7000)
)
reg_expected = expected.get("regulations", {})
for reg_code, reg_exp in reg_expected.items():
actual = self.stats["by_regulation"].get(reg_code, 0)
self.checkpoint_mgr.validate(
f"chunks_{reg_code}",
expected=reg_exp.get("expected", 0),
actual=actual,
min_value=reg_exp.get("min", 0)
)
self.checkpoint_mgr.complete_checkpoint(success=True)
return total_chunks
except Exception as e:
if self.checkpoint_mgr:
self.checkpoint_mgr.fail_checkpoint(str(e))
raise
finally:
await ingestion.close()
async def run_extraction_phase(self) -> int:
"""Phase 2: Extract checkpoints from chunks."""
logger.info("\n" + "=" * 60)
logger.info("PHASE 2: CHECKPOINT EXTRACTION")
logger.info("=" * 60)
if self.checkpoint_mgr:
self.checkpoint_mgr.start_checkpoint("extraction", "Checkpoint Extraction")
try:
offset = None
total_checkpoints = 0
while True:
result = self.qdrant.scroll(
collection_name=LEGAL_CORPUS_COLLECTION,
limit=100,
offset=offset,
with_payload=True,
with_vectors=False
)
points, next_offset = result
if not points:
break
for point in points:
payload = point.payload
text = payload.get("text", "")
cps = extract_checkpoints_from_chunk(text, payload)
self.checkpoints.extend(cps)
total_checkpoints += len(cps)
logger.info(f"Processed {len(points)} chunks, extracted {total_checkpoints} checkpoints so far...")
if next_offset is None:
break
offset = next_offset
self.stats["checkpoints_extracted"] = len(self.checkpoints)
logger.info(f"\nTotal checkpoints extracted: {len(self.checkpoints)}")
by_reg = {}
for cp in self.checkpoints:
by_reg[cp.regulation_code] = by_reg.get(cp.regulation_code, 0) + 1
for reg, count in sorted(by_reg.items()):
logger.info(f" {reg}: {count} checkpoints")
if self.checkpoint_mgr:
self.checkpoint_mgr.add_metric("total_checkpoints", len(self.checkpoints))
self.checkpoint_mgr.add_metric("checkpoints_by_regulation", by_reg)
expected = EXPECTED_VALUES.get("extraction", {})
self.checkpoint_mgr.validate(
"total_checkpoints",
expected=expected.get("total_checkpoints", 3500),
actual=len(self.checkpoints),
min_value=expected.get("min_checkpoints", 3000)
)
self.checkpoint_mgr.complete_checkpoint(success=True)
return len(self.checkpoints)
except Exception as e:
if self.checkpoint_mgr:
self.checkpoint_mgr.fail_checkpoint(str(e))
raise
async def run_control_generation_phase(self) -> int:
"""Phase 3: Generate controls from checkpoints."""
logger.info("\n" + "=" * 60)
logger.info("PHASE 3: CONTROL GENERATION")
logger.info("=" * 60)
if self.checkpoint_mgr:
self.checkpoint_mgr.start_checkpoint("controls", "Control Generation")
try:
# Group checkpoints by regulation
by_regulation: Dict[str, List[Checkpoint]] = {}
for cp in self.checkpoints:
reg = cp.regulation_code
if reg not in by_regulation:
by_regulation[reg] = []
by_regulation[reg].append(cp)
# Generate controls per regulation (group every 3-5 checkpoints)
for regulation, checkpoints in by_regulation.items():
logger.info(f"Generating controls for {regulation} ({len(checkpoints)} checkpoints)...")
batch_size = 4
for i in range(0, len(checkpoints), batch_size):
batch = checkpoints[i:i + batch_size]
control = generate_control_for_checkpoints(batch, self.stats.get("by_domain", {}))
if control:
self.controls.append(control)
self.stats["by_domain"][control.domain] = self.stats["by_domain"].get(control.domain, 0) + 1
self.stats["controls_created"] = len(self.controls)
logger.info(f"\nTotal controls created: {len(self.controls)}")
for domain, count in sorted(self.stats["by_domain"].items()):
logger.info(f" {domain}: {count} controls")
if self.checkpoint_mgr:
self.checkpoint_mgr.add_metric("total_controls", len(self.controls))
self.checkpoint_mgr.add_metric("controls_by_domain", dict(self.stats["by_domain"]))
expected = EXPECTED_VALUES.get("controls", {})
self.checkpoint_mgr.validate(
"total_controls",
expected=expected.get("total_controls", 900),
actual=len(self.controls),
min_value=expected.get("min_controls", 800)
)
self.checkpoint_mgr.complete_checkpoint(success=True)
return len(self.controls)
except Exception as e:
if self.checkpoint_mgr:
self.checkpoint_mgr.fail_checkpoint(str(e))
raise
async def run_measure_generation_phase(self) -> int:
"""Phase 4: Generate measures for controls."""
logger.info("\n" + "=" * 60)
logger.info("PHASE 4: MEASURE GENERATION")
logger.info("=" * 60)
if self.checkpoint_mgr:
self.checkpoint_mgr.start_checkpoint("measures", "Measure Generation")
try:
for control in self.controls:
measure = generate_measure_for_control(control)
self.measures.append(measure)
self.stats["measures_defined"] = len(self.measures)
logger.info(f"\nTotal measures defined: {len(self.measures)}")
if self.checkpoint_mgr:
self.checkpoint_mgr.add_metric("total_measures", len(self.measures))
expected = EXPECTED_VALUES.get("measures", {})
self.checkpoint_mgr.validate(
"total_measures",
expected=expected.get("total_measures", 900),
actual=len(self.measures),
min_value=expected.get("min_measures", 800)
)
self.checkpoint_mgr.complete_checkpoint(success=True)
return len(self.measures)
except Exception as e:
if self.checkpoint_mgr:
self.checkpoint_mgr.fail_checkpoint(str(e))
raise
def save_results(self, output_dir: str = "/tmp/compliance_output"):
"""Save results to JSON files."""
logger.info("\n" + "=" * 60)
logger.info("SAVING RESULTS")
logger.info("=" * 60)
os.makedirs(output_dir, exist_ok=True)
checkpoints_file = os.path.join(output_dir, "checkpoints.json")
with open(checkpoints_file, "w") as f:
json.dump([asdict(cp) for cp in self.checkpoints], f, indent=2, ensure_ascii=False)
logger.info(f"Saved {len(self.checkpoints)} checkpoints to {checkpoints_file}")
controls_file = os.path.join(output_dir, "controls.json")
with open(controls_file, "w") as f:
json.dump([asdict(c) for c in self.controls], f, indent=2, ensure_ascii=False)
logger.info(f"Saved {len(self.controls)} controls to {controls_file}")
measures_file = os.path.join(output_dir, "measures.json")
with open(measures_file, "w") as f:
json.dump([asdict(m) for m in self.measures], f, indent=2, ensure_ascii=False)
logger.info(f"Saved {len(self.measures)} measures to {measures_file}")
stats_file = os.path.join(output_dir, "statistics.json")
self.stats["generated_at"] = datetime.now().isoformat()
with open(stats_file, "w") as f:
json.dump(self.stats, f, indent=2, ensure_ascii=False)
logger.info(f"Saved statistics to {stats_file}")
async def run_full_pipeline(self, force_reindex: bool = False, skip_ingestion: bool = False):
"""Run the complete pipeline.
Args:
force_reindex: If True, re-ingest all documents even if they exist
skip_ingestion: If True, skip ingestion phase entirely (use existing chunks)
"""
start_time = time.time()
logger.info("=" * 60)
logger.info("FULL COMPLIANCE PIPELINE (INCREMENTAL)")
logger.info(f"Started at: {datetime.now().isoformat()}")
logger.info(f"Force reindex: {force_reindex}")
logger.info(f"Skip ingestion: {skip_ingestion}")
if self.checkpoint_mgr:
logger.info(f"Pipeline ID: {self.checkpoint_mgr.pipeline_id}")
logger.info("=" * 60)
try:
if skip_ingestion:
logger.info("Skipping ingestion phase as requested...")
try:
collection_info = self.qdrant.get_collection(LEGAL_CORPUS_COLLECTION)
self.stats["chunks_processed"] = collection_info.points_count
except Exception:
self.stats["chunks_processed"] = 0
else:
await self.run_ingestion_phase(force_reindex=force_reindex)
await self.run_extraction_phase()
await self.run_control_generation_phase()
await self.run_measure_generation_phase()
self.save_results()
elapsed = time.time() - start_time
logger.info("\n" + "=" * 60)
logger.info("PIPELINE COMPLETE")
logger.info("=" * 60)
logger.info(f"Duration: {elapsed:.1f} seconds")
logger.info(f"Chunks processed: {self.stats['chunks_processed']}")
logger.info(f"Checkpoints extracted: {self.stats['checkpoints_extracted']}")
logger.info(f"Controls created: {self.stats['controls_created']}")
logger.info(f"Measures defined: {self.stats['measures_defined']}")
logger.info(f"\nResults saved to: /tmp/compliance_output/")
logger.info("Checkpoint status: /tmp/pipeline_checkpoints.json")
logger.info("=" * 60)
if self.checkpoint_mgr:
self.checkpoint_mgr.complete_pipeline({
"duration_seconds": elapsed,
"chunks_processed": self.stats['chunks_processed'],
"checkpoints_extracted": self.stats['checkpoints_extracted'],
"controls_created": self.stats['controls_created'],
"measures_defined": self.stats['measures_defined'],
"by_regulation": self.stats['by_regulation'],
"by_domain": self.stats['by_domain'],
})
except Exception as e:
logger.error(f"Pipeline failed: {e}")
if self.checkpoint_mgr:
self.checkpoint_mgr.state.status = "failed"
self.checkpoint_mgr._save()
raise

View File

@@ -0,0 +1,38 @@
"""
RBAC/ABAC Policy System for Klausur-Service (barrel re-export)
This module was split into:
- rbac_types.py (Enums, data structures)
- rbac_permissions.py (Permission matrix)
- rbac_engine.py (PolicyEngine, default policies, API guards)
All public symbols are re-exported here for backwards compatibility.
"""
# Types and enums
from .rbac_types import ( # noqa: F401
Role,
Action,
ResourceType,
ZKVisibilityMode,
EHVisibilityMode,
VerfahrenType,
PolicySet,
RoleAssignment,
KeyShare,
Tenant,
Namespace,
ExamPackage,
)
# Permission matrix
from .rbac_permissions import DEFAULT_PERMISSIONS # noqa: F401
# Engine, policies, guards
from .rbac_engine import ( # noqa: F401
PolicyEngine,
create_default_policy_sets,
get_policy_engine,
require_permission,
require_role,
)

View File

@@ -0,0 +1,498 @@
"""
RBAC Policy Engine
Core engine for RBAC/ABAC permission checks,
role assignments, key shares, and default policies.
Extracted from rbac.py for file-size compliance.
"""
from typing import Optional, List, Dict, Set
from datetime import datetime, timezone
import uuid
from functools import wraps
from fastapi import HTTPException, Request
from .rbac_types import (
Role,
Action,
ResourceType,
ZKVisibilityMode,
PolicySet,
RoleAssignment,
KeyShare,
)
from .rbac_permissions import DEFAULT_PERMISSIONS
# =============================================
# POLICY ENGINE
# =============================================
class PolicyEngine:
"""
Engine fuer RBAC/ABAC Entscheidungen.
Prueft:
1. Basis-Rollenberechtigung (RBAC)
2. Policy-Einschraenkungen (ABAC)
3. Key Share Berechtigungen
"""
def __init__(self):
self.policy_sets: Dict[str, PolicySet] = {}
self.role_assignments: Dict[str, List[RoleAssignment]] = {} # user_id -> assignments
self.key_shares: Dict[str, List[KeyShare]] = {} # user_id -> shares
def register_policy_set(self, policy: PolicySet):
"""Registriere ein Policy Set."""
self.policy_sets[policy.id] = policy
def get_policy_for_context(
self,
bundesland: str,
jahr: int,
fach: Optional[str] = None,
verfahren: str = "abitur"
) -> Optional[PolicySet]:
"""Finde das passende Policy Set fuer einen Kontext."""
# Exakte Uebereinstimmung
for policy in self.policy_sets.values():
if (policy.bundesland == bundesland and
policy.jahr == jahr and
policy.verfahren == verfahren):
if policy.fach is None or policy.fach == fach:
return policy
# Fallback: Default Policy
for policy in self.policy_sets.values():
if policy.bundesland == "DEFAULT":
return policy
return None
def assign_role(
self,
user_id: str,
role: Role,
resource_type: ResourceType,
resource_id: str,
granted_by: str,
tenant_id: Optional[str] = None,
namespace_id: Optional[str] = None,
valid_to: Optional[datetime] = None
) -> RoleAssignment:
"""Weise einem User eine Rolle zu."""
assignment = RoleAssignment(
id=str(uuid.uuid4()),
user_id=user_id,
role=role,
resource_type=resource_type,
resource_id=resource_id,
tenant_id=tenant_id,
namespace_id=namespace_id,
granted_by=granted_by,
valid_to=valid_to
)
if user_id not in self.role_assignments:
self.role_assignments[user_id] = []
self.role_assignments[user_id].append(assignment)
return assignment
def revoke_role(self, assignment_id: str, revoked_by: str) -> bool:
"""Widerrufe eine Rollenzuweisung."""
for user_assignments in self.role_assignments.values():
for assignment in user_assignments:
if assignment.id == assignment_id:
assignment.revoked_at = datetime.now(timezone.utc)
return True
return False
def get_user_roles(
self,
user_id: str,
resource_type: Optional[ResourceType] = None,
resource_id: Optional[str] = None
) -> List[Role]:
"""Hole alle aktiven Rollen eines Users."""
assignments = self.role_assignments.get(user_id, [])
roles = []
for assignment in assignments:
if not assignment.is_active():
continue
if resource_type and assignment.resource_type != resource_type:
continue
if resource_id and assignment.resource_id != resource_id:
continue
roles.append(assignment.role)
return list(set(roles))
def create_key_share(
self,
user_id: str,
package_id: str,
permissions: Set[str],
granted_by: str,
scope: str = "full",
invite_token: Optional[str] = None
) -> KeyShare:
"""Erstelle einen Key Share."""
share = KeyShare(
id=str(uuid.uuid4()),
user_id=user_id,
package_id=package_id,
permissions=permissions,
scope=scope,
granted_by=granted_by,
invite_token=invite_token
)
if user_id not in self.key_shares:
self.key_shares[user_id] = []
self.key_shares[user_id].append(share)
return share
def accept_key_share(self, share_id: str, token: str) -> bool:
"""Akzeptiere einen Key Share via Invite Token."""
for user_shares in self.key_shares.values():
for share in user_shares:
if share.id == share_id and share.invite_token == token:
share.accepted_at = datetime.now(timezone.utc)
return True
return False
def revoke_key_share(self, share_id: str, revoked_by: str) -> bool:
"""Widerrufe einen Key Share."""
for user_shares in self.key_shares.values():
for share in user_shares:
if share.id == share_id:
share.revoked_at = datetime.now(timezone.utc)
share.revoked_by = revoked_by
return True
return False
def check_permission(
self,
user_id: str,
action: Action,
resource_type: ResourceType,
resource_id: str,
policy: Optional[PolicySet] = None,
package_id: Optional[str] = None
) -> bool:
"""
Pruefe ob ein User eine Aktion ausfuehren darf.
Prueft:
1. Basis-RBAC
2. Policy-Einschraenkungen
3. Key Share (falls package_id angegeben)
"""
# 1. Hole aktive Rollen
roles = self.get_user_roles(user_id, resource_type, resource_id)
if not roles:
return False
# 2. Pruefe Basis-RBAC
has_permission = False
for role in roles:
role_permissions = DEFAULT_PERMISSIONS.get(role, {})
resource_permissions = role_permissions.get(resource_type, set())
if action in resource_permissions:
has_permission = True
break
if not has_permission:
return False
# 3. Pruefe Policy-Einschraenkungen
if policy:
# ZK Visibility Mode
if Role.ZWEITKORREKTOR in roles:
if policy.zk_visibility_mode == ZKVisibilityMode.BLIND:
# Blind: ZK darf EK-Outputs nicht sehen
if resource_type in [ResourceType.EVALUATION, ResourceType.REPORT, ResourceType.GRADE_DECISION]:
if action == Action.READ:
# Pruefe ob es EK-Outputs sind (muesste ueber Metadaten geprueft werden)
pass # Implementierung abhaengig von Datenmodell
elif policy.zk_visibility_mode == ZKVisibilityMode.SEMI:
# Semi: ZK sieht Annotationen, aber keine Note
if resource_type == ResourceType.GRADE_DECISION and action == Action.READ:
return False
# 4. Pruefe Key Share (falls Package-basiert)
if package_id:
user_shares = self.key_shares.get(user_id, [])
has_key_share = any(
share.package_id == package_id and share.is_active()
for share in user_shares
)
if not has_key_share:
return False
return True
def get_allowed_actions(
self,
user_id: str,
resource_type: ResourceType,
resource_id: str,
policy: Optional[PolicySet] = None
) -> Set[Action]:
"""Hole alle erlaubten Aktionen fuer einen User auf einer Ressource."""
roles = self.get_user_roles(user_id, resource_type, resource_id)
allowed = set()
for role in roles:
role_permissions = DEFAULT_PERMISSIONS.get(role, {})
resource_permissions = role_permissions.get(resource_type, set())
allowed.update(resource_permissions)
# Policy-Einschraenkungen anwenden
if policy and Role.ZWEITKORREKTOR in roles:
if policy.zk_visibility_mode == ZKVisibilityMode.BLIND:
# Entferne READ fuer bestimmte Ressourcen
pass # Detailimplementierung
return allowed
# =============================================
# DEFAULT POLICY SETS (alle Bundeslaender)
# =============================================
def create_default_policy_sets() -> List[PolicySet]:
"""
Erstelle Default Policy Sets fuer alle Bundeslaender.
Diese koennen spaeter pro Land verfeinert werden.
"""
bundeslaender = [
"baden-wuerttemberg", "bayern", "berlin", "brandenburg",
"bremen", "hamburg", "hessen", "mecklenburg-vorpommern",
"niedersachsen", "nordrhein-westfalen", "rheinland-pfalz",
"saarland", "sachsen", "sachsen-anhalt", "schleswig-holstein",
"thueringen"
]
policies = []
# Default Policy (Fallback)
policies.append(PolicySet(
id="DEFAULT-2025",
bundesland="DEFAULT",
jahr=2025,
fach=None,
verfahren="abitur",
zk_visibility_mode=ZKVisibilityMode.FULL,
eh_visibility_mode=PolicySet.__dataclass_fields__["eh_visibility_mode"].default,
allow_teacher_uploaded_eh=True,
allow_land_uploaded_eh=True,
require_rights_confirmation_on_upload=True,
third_correction_threshold=4,
final_signoff_role="fachvorsitz"
))
# Niedersachsen (Beispiel mit spezifischen Anpassungen)
policies.append(PolicySet(
id="NI-2025-ABITUR",
bundesland="niedersachsen",
jahr=2025,
fach=None,
verfahren="abitur",
zk_visibility_mode=ZKVisibilityMode.FULL, # In NI sieht ZK alles
allow_teacher_uploaded_eh=True,
allow_land_uploaded_eh=True,
require_rights_confirmation_on_upload=True,
third_correction_threshold=4,
final_signoff_role="fachvorsitz",
export_template_id="niedersachsen-abitur"
))
# Bayern (Beispiel mit SEMI visibility)
policies.append(PolicySet(
id="BY-2025-ABITUR",
bundesland="bayern",
jahr=2025,
fach=None,
verfahren="abitur",
zk_visibility_mode=ZKVisibilityMode.SEMI, # ZK sieht Annotationen, nicht Note
allow_teacher_uploaded_eh=True,
allow_land_uploaded_eh=True,
require_rights_confirmation_on_upload=True,
third_correction_threshold=4,
final_signoff_role="fachvorsitz",
export_template_id="bayern-abitur"
))
# NRW (Beispiel)
policies.append(PolicySet(
id="NW-2025-ABITUR",
bundesland="nordrhein-westfalen",
jahr=2025,
fach=None,
verfahren="abitur",
zk_visibility_mode=ZKVisibilityMode.FULL,
allow_teacher_uploaded_eh=True,
allow_land_uploaded_eh=True,
require_rights_confirmation_on_upload=True,
third_correction_threshold=4,
final_signoff_role="fachvorsitz",
export_template_id="nrw-abitur"
))
# Generiere Basis-Policies fuer alle anderen Bundeslaender
for bl in bundeslaender:
if bl not in ["niedersachsen", "bayern", "nordrhein-westfalen"]:
policies.append(PolicySet(
id=f"{bl[:2].upper()}-2025-ABITUR",
bundesland=bl,
jahr=2025,
fach=None,
verfahren="abitur",
zk_visibility_mode=ZKVisibilityMode.FULL,
allow_teacher_uploaded_eh=True,
allow_land_uploaded_eh=True,
require_rights_confirmation_on_upload=True,
third_correction_threshold=4,
final_signoff_role="fachvorsitz"
))
return policies
# =============================================
# GLOBAL POLICY ENGINE INSTANCE
# =============================================
# Singleton Policy Engine
_policy_engine: Optional[PolicyEngine] = None
def get_policy_engine() -> PolicyEngine:
"""Hole die globale Policy Engine Instanz."""
global _policy_engine
if _policy_engine is None:
_policy_engine = PolicyEngine()
# Registriere Default Policies
for policy in create_default_policy_sets():
_policy_engine.register_policy_set(policy)
return _policy_engine
# =============================================
# API GUARDS (Decorators fuer FastAPI)
# =============================================
def require_permission(
action: Action,
resource_type: ResourceType,
resource_id_param: str = "resource_id"
):
"""
Decorator fuer FastAPI Endpoints.
Prueft ob der aktuelle User die angegebene Berechtigung hat.
Usage:
@app.get("/api/v1/packages/{package_id}")
@require_permission(Action.READ, ResourceType.EXAM_PACKAGE, "package_id")
async def get_package(package_id: str, request: Request):
...
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
request = kwargs.get('request')
if not request:
for arg in args:
if isinstance(arg, Request):
request = arg
break
if not request:
raise HTTPException(status_code=500, detail="Request not found")
# User aus Token holen
user = getattr(request.state, 'user', None)
if not user:
raise HTTPException(status_code=401, detail="Not authenticated")
user_id = user.get('user_id')
resource_id = kwargs.get(resource_id_param)
# Policy Engine pruefen
engine = get_policy_engine()
# Optional: Policy aus Kontext laden
policy = None
bundesland = user.get('bundesland')
if bundesland:
policy = engine.get_policy_for_context(bundesland, 2025)
if not engine.check_permission(
user_id=user_id,
action=action,
resource_type=resource_type,
resource_id=resource_id,
policy=policy
):
raise HTTPException(
status_code=403,
detail=f"Permission denied: {action.value} on {resource_type.value}"
)
return await func(*args, **kwargs)
return wrapper
return decorator
def require_role(role: Role):
"""
Decorator der prueft ob User eine bestimmte Rolle hat.
Usage:
@app.post("/api/v1/eh/publish")
@require_role(Role.LAND_ADMIN)
async def publish_eh(request: Request):
...
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
request = kwargs.get('request')
if not request:
for arg in args:
if isinstance(arg, Request):
request = arg
break
if not request:
raise HTTPException(status_code=500, detail="Request not found")
user = getattr(request.state, 'user', None)
if not user:
raise HTTPException(status_code=401, detail="Not authenticated")
user_id = user.get('user_id')
engine = get_policy_engine()
user_roles = engine.get_user_roles(user_id)
if role not in user_roles:
raise HTTPException(
status_code=403,
detail=f"Role required: {role.value}"
)
return await func(*args, **kwargs)
return wrapper
return decorator

View File

@@ -0,0 +1,221 @@
"""
RBAC Permission Matrix
Default role-to-resource permission mappings for
Klausur-Korrektur and Zeugnis workflows.
Extracted from rbac.py for file-size compliance.
"""
from typing import Dict, Set
from .rbac_types import Role, Action, ResourceType
# =============================================
# RBAC PERMISSION MATRIX
# =============================================
# Standard-Berechtigungsmatrix (kann durch Policies ueberschrieben werden)
DEFAULT_PERMISSIONS: Dict[Role, Dict[ResourceType, Set[Action]]] = {
# Erstkorrektor
Role.ERSTKORREKTOR: {
ResourceType.EXAM_PACKAGE: {Action.READ, Action.UPDATE, Action.SHARE_KEY, Action.LOCK},
ResourceType.STUDENT_WORK: {Action.READ, Action.UPDATE},
ResourceType.EH_DOCUMENT: {Action.READ, Action.UPLOAD, Action.UPDATE},
ResourceType.RUBRIC: {Action.READ, Action.UPDATE},
ResourceType.ANNOTATION: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE},
ResourceType.EVALUATION: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.REPORT: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.GRADE_DECISION: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD},
ResourceType.AUDIT_LOG: {Action.READ},
},
# Zweitkorrektor (Standard: FULL visibility)
Role.ZWEITKORREKTOR: {
ResourceType.EXAM_PACKAGE: {Action.READ},
ResourceType.STUDENT_WORK: {Action.READ, Action.UPDATE},
ResourceType.EH_DOCUMENT: {Action.READ},
ResourceType.RUBRIC: {Action.READ},
ResourceType.ANNOTATION: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.EVALUATION: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.REPORT: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.GRADE_DECISION: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.EXPORT: {Action.READ, Action.DOWNLOAD},
ResourceType.AUDIT_LOG: {Action.READ},
},
# Drittkorrektor
Role.DRITTKORREKTOR: {
ResourceType.EXAM_PACKAGE: {Action.READ},
ResourceType.STUDENT_WORK: {Action.READ, Action.UPDATE},
ResourceType.EH_DOCUMENT: {Action.READ},
ResourceType.RUBRIC: {Action.READ},
ResourceType.ANNOTATION: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.EVALUATION: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.REPORT: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.GRADE_DECISION: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.AUDIT_LOG: {Action.READ},
},
# Fachvorsitz
Role.FACHVORSITZ: {
ResourceType.TENANT: {Action.READ},
ResourceType.NAMESPACE: {Action.READ, Action.UPDATE},
ResourceType.EXAM_PACKAGE: {Action.READ, Action.UPDATE, Action.LOCK, Action.UNLOCK, Action.SIGN_OFF},
ResourceType.STUDENT_WORK: {Action.READ, Action.UPDATE},
ResourceType.EH_DOCUMENT: {Action.READ, Action.UPLOAD, Action.UPDATE},
ResourceType.RUBRIC: {Action.READ, Action.UPDATE},
ResourceType.ANNOTATION: {Action.READ, Action.UPDATE},
ResourceType.EVALUATION: {Action.READ, Action.UPDATE},
ResourceType.REPORT: {Action.READ, Action.UPDATE},
ResourceType.GRADE_DECISION: {Action.READ, Action.UPDATE, Action.SIGN_OFF},
ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD},
ResourceType.AUDIT_LOG: {Action.READ},
},
# Pruefungsvorsitz
Role.PRUEFUNGSVORSITZ: {
ResourceType.TENANT: {Action.READ},
ResourceType.NAMESPACE: {Action.READ, Action.CREATE},
ResourceType.EXAM_PACKAGE: {Action.READ, Action.SIGN_OFF},
ResourceType.STUDENT_WORK: {Action.READ},
ResourceType.EH_DOCUMENT: {Action.READ},
ResourceType.GRADE_DECISION: {Action.READ, Action.SIGN_OFF},
ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD},
ResourceType.AUDIT_LOG: {Action.READ},
},
# Schul-Admin
Role.SCHUL_ADMIN: {
ResourceType.TENANT: {Action.READ, Action.UPDATE},
ResourceType.NAMESPACE: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE},
ResourceType.EXAM_PACKAGE: {Action.CREATE, Action.READ, Action.DELETE, Action.ASSIGN_ROLE},
ResourceType.EH_DOCUMENT: {Action.READ, Action.UPLOAD, Action.DELETE},
ResourceType.AUDIT_LOG: {Action.READ},
},
# Land-Admin (Behoerde)
Role.LAND_ADMIN: {
ResourceType.TENANT: {Action.READ},
ResourceType.EH_DOCUMENT: {Action.READ, Action.UPLOAD, Action.UPDATE, Action.DELETE, Action.PUBLISH_OFFICIAL},
ResourceType.AUDIT_LOG: {Action.READ},
},
# Auditor
Role.AUDITOR: {
ResourceType.AUDIT_LOG: {Action.READ},
ResourceType.EXAM_PACKAGE: {Action.READ}, # Nur Metadaten
# Kein Zugriff auf Inhalte!
},
# Operator
Role.OPERATOR: {
ResourceType.TENANT: {Action.READ},
ResourceType.NAMESPACE: {Action.READ},
ResourceType.EXAM_PACKAGE: {Action.READ}, # Nur Metadaten
ResourceType.AUDIT_LOG: {Action.READ},
# Break-glass separat gehandhabt
},
# Teacher Assistant
Role.TEACHER_ASSISTANT: {
ResourceType.STUDENT_WORK: {Action.READ},
ResourceType.ANNOTATION: {Action.CREATE, Action.READ}, # Nur bestimmte Typen
ResourceType.EH_DOCUMENT: {Action.READ},
},
# Exam Author (nur Vorabi)
Role.EXAM_AUTHOR: {
ResourceType.EH_DOCUMENT: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE},
ResourceType.RUBRIC: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE},
},
# =============================================
# ZEUGNIS-WORKFLOW ROLLEN
# =============================================
# Klassenlehrer - Erstellt Zeugnisse, Kopfnoten, Bemerkungen
Role.KLASSENLEHRER: {
ResourceType.NAMESPACE: {Action.READ},
ResourceType.ZEUGNIS: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.ZEUGNIS_ENTWURF: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE},
ResourceType.ZEUGNIS_VORLAGE: {Action.READ},
ResourceType.SCHUELER_DATEN: {Action.READ, Action.UPDATE},
ResourceType.FACHNOTE: {Action.READ}, # Liest Fachnoten der Fachlehrer
ResourceType.KOPFNOTE: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.FEHLZEITEN: {Action.READ, Action.UPDATE},
ResourceType.BEMERKUNG: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE},
ResourceType.VERSETZUNG: {Action.READ},
ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD},
ResourceType.AUDIT_LOG: {Action.READ},
},
# Fachlehrer - Traegt Fachnoten ein
Role.FACHLEHRER: {
ResourceType.NAMESPACE: {Action.READ},
ResourceType.SCHUELER_DATEN: {Action.READ}, # Nur eigene Schueler
ResourceType.FACHNOTE: {Action.CREATE, Action.READ, Action.UPDATE}, # Nur eigenes Fach
ResourceType.BEMERKUNG: {Action.CREATE, Action.READ}, # Fachbezogene Bemerkungen
ResourceType.AUDIT_LOG: {Action.READ},
},
# Zeugnisbeauftragter - Qualitaetskontrolle
Role.ZEUGNISBEAUFTRAGTER: {
ResourceType.NAMESPACE: {Action.READ, Action.UPDATE},
ResourceType.ZEUGNIS: {Action.READ, Action.UPDATE},
ResourceType.ZEUGNIS_ENTWURF: {Action.READ, Action.UPDATE},
ResourceType.ZEUGNIS_VORLAGE: {Action.READ, Action.UPDATE, Action.UPLOAD},
ResourceType.SCHUELER_DATEN: {Action.READ},
ResourceType.FACHNOTE: {Action.READ},
ResourceType.KOPFNOTE: {Action.READ, Action.UPDATE},
ResourceType.FEHLZEITEN: {Action.READ},
ResourceType.BEMERKUNG: {Action.READ, Action.UPDATE},
ResourceType.VERSETZUNG: {Action.READ},
ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD},
ResourceType.AUDIT_LOG: {Action.READ},
},
# Sekretariat - Druck, Versand, Archivierung
Role.SEKRETARIAT: {
ResourceType.ZEUGNIS: {Action.READ, Action.DOWNLOAD},
ResourceType.ZEUGNIS_VORLAGE: {Action.READ},
ResourceType.SCHUELER_DATEN: {Action.READ}, # Fuer Adressdaten
ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD},
ResourceType.AUDIT_LOG: {Action.READ},
},
# Schulleitung - Finale Zeugnis-Freigabe
Role.SCHULLEITUNG: {
ResourceType.TENANT: {Action.READ},
ResourceType.NAMESPACE: {Action.READ, Action.CREATE},
ResourceType.ZEUGNIS: {Action.READ, Action.SIGN_OFF, Action.LOCK},
ResourceType.ZEUGNIS_ENTWURF: {Action.READ, Action.UPDATE},
ResourceType.ZEUGNIS_VORLAGE: {Action.READ, Action.UPDATE},
ResourceType.SCHUELER_DATEN: {Action.READ},
ResourceType.FACHNOTE: {Action.READ},
ResourceType.KOPFNOTE: {Action.READ, Action.UPDATE},
ResourceType.FEHLZEITEN: {Action.READ},
ResourceType.BEMERKUNG: {Action.READ, Action.UPDATE},
ResourceType.KONFERENZ_BESCHLUSS: {Action.CREATE, Action.READ, Action.UPDATE, Action.SIGN_OFF},
ResourceType.VERSETZUNG: {Action.CREATE, Action.READ, Action.UPDATE, Action.SIGN_OFF},
ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD},
ResourceType.AUDIT_LOG: {Action.READ},
},
# Stufenleitung - Stufenkoordination (z.B. Oberstufe)
Role.STUFENLEITUNG: {
ResourceType.NAMESPACE: {Action.READ, Action.UPDATE},
ResourceType.ZEUGNIS: {Action.READ, Action.UPDATE},
ResourceType.ZEUGNIS_ENTWURF: {Action.READ, Action.UPDATE},
ResourceType.SCHUELER_DATEN: {Action.READ},
ResourceType.FACHNOTE: {Action.READ},
ResourceType.KOPFNOTE: {Action.READ},
ResourceType.FEHLZEITEN: {Action.READ},
ResourceType.BEMERKUNG: {Action.READ, Action.UPDATE},
ResourceType.KONFERENZ_BESCHLUSS: {Action.READ},
ResourceType.VERSETZUNG: {Action.READ, Action.UPDATE},
ResourceType.EXPORT: {Action.READ, Action.DOWNLOAD},
ResourceType.AUDIT_LOG: {Action.READ},
},
}

View File

@@ -0,0 +1,438 @@
"""
RBAC/ABAC Type Definitions
Enums, data structures, and models for the policy system.
Extracted from rbac.py for file-size compliance.
"""
import json
from enum import Enum
from dataclasses import dataclass, field, asdict
from typing import Optional, List, Dict, Set, Any
from datetime import datetime, timezone
import uuid
# =============================================
# ENUMS: Roles, Actions, Resources
# =============================================
class Role(str, Enum):
"""Fachliche Rollen in Korrektur- und Zeugniskette."""
# === Klausur-Korrekturkette ===
ERSTKORREKTOR = "erstkorrektor" # EK
ZWEITKORREKTOR = "zweitkorrektor" # ZK
DRITTKORREKTOR = "drittkorrektor" # DK
# === Zeugnis-Workflow ===
KLASSENLEHRER = "klassenlehrer" # KL - Erstellt Zeugnis, Kopfnoten, Bemerkungen
FACHLEHRER = "fachlehrer" # FL - Traegt Fachnoten ein
ZEUGNISBEAUFTRAGTER = "zeugnisbeauftragter" # ZB - Qualitaetskontrolle
SEKRETARIAT = "sekretariat" # SEK - Druck, Versand, Archivierung
# === Leitung (Klausur + Zeugnis) ===
FACHVORSITZ = "fachvorsitz" # FVL - Fachpruefungsleitung
PRUEFUNGSVORSITZ = "pruefungsvorsitz" # PV - Schulleitung / Pruefungsvorsitz
SCHULLEITUNG = "schulleitung" # SL - Finale Zeugnis-Freigabe
STUFENLEITUNG = "stufenleitung" # STL - Stufenkoordination
# === Administration ===
SCHUL_ADMIN = "schul_admin" # SA
LAND_ADMIN = "land_admin" # LA - Behoerde
# === Spezial ===
AUDITOR = "auditor" # DSB/Auditor
OPERATOR = "operator" # OPS - Support
TEACHER_ASSISTANT = "teacher_assistant" # TA - Referendar
EXAM_AUTHOR = "exam_author" # EA - nur Vorabi
class Action(str, Enum):
"""Moegliche Operationen auf Ressourcen."""
CREATE = "create"
READ = "read"
UPDATE = "update"
DELETE = "delete"
ASSIGN_ROLE = "assign_role"
INVITE_USER = "invite_user"
REMOVE_USER = "remove_user"
UPLOAD = "upload"
DOWNLOAD = "download"
LOCK = "lock" # Finalisieren
UNLOCK = "unlock" # Nur mit Sonderrecht
SIGN_OFF = "sign_off" # Freigabe
SHARE_KEY = "share_key" # Key Share erzeugen
VIEW_PII = "view_pii" # Falls PII vorhanden
BREAK_GLASS = "break_glass" # Notfallzugriff
PUBLISH_OFFICIAL = "publish_official" # Amtliche EH verteilen
class ResourceType(str, Enum):
"""Ressourcentypen im System."""
TENANT = "tenant"
NAMESPACE = "namespace"
# === Klausur-Korrektur ===
EXAM_PACKAGE = "exam_package"
STUDENT_WORK = "student_work"
EH_DOCUMENT = "eh_document"
RUBRIC = "rubric" # Punkteraster
ANNOTATION = "annotation"
EVALUATION = "evaluation" # Kriterien/Punkte
REPORT = "report" # Gutachten
GRADE_DECISION = "grade_decision"
# === Zeugnisgenerator ===
ZEUGNIS = "zeugnis" # Zeugnisdokument
ZEUGNIS_VORLAGE = "zeugnis_vorlage" # Zeugnisvorlage/Template
ZEUGNIS_ENTWURF = "zeugnis_entwurf" # Zeugnisentwurf (vor Freigabe)
SCHUELER_DATEN = "schueler_daten" # Schueler-Stammdaten, Noten
FACHNOTE = "fachnote" # Einzelne Fachnote
KOPFNOTE = "kopfnote" # Arbeits-/Sozialverhalten
FEHLZEITEN = "fehlzeiten" # Fehlzeiten
BEMERKUNG = "bemerkung" # Zeugnisbemerkungen
KONFERENZ_BESCHLUSS = "konferenz_beschluss" # Konferenzergebnis
VERSETZUNG = "versetzung" # Versetzungsentscheidung
# === Allgemein ===
DOCUMENT = "document" # Generischer Dokumenttyp (EH, Vorlagen, etc.)
TEMPLATE = "template" # Generische Vorlagen
EXPORT = "export"
AUDIT_LOG = "audit_log"
KEY_MATERIAL = "key_material"
class ZKVisibilityMode(str, Enum):
"""Sichtbarkeitsmodus fuer Zweitkorrektoren."""
BLIND = "blind" # ZK sieht keine EK-Note/Gutachten
SEMI = "semi" # ZK sieht Annotationen, aber keine Note
FULL = "full" # ZK sieht alles
class EHVisibilityMode(str, Enum):
"""Sichtbarkeitsmodus fuer Erwartungshorizonte."""
BLIND = "blind" # ZK sieht EH nicht (selten)
SHARED = "shared" # ZK sieht EH (Standard)
class VerfahrenType(str, Enum):
"""Verfahrenstypen fuer Klausuren und Zeugnisse."""
# === Klausur/Pruefungsverfahren ===
ABITUR = "abitur"
VORABITUR = "vorabitur"
KLAUSUR = "klausur"
NACHPRUEFUNG = "nachpruefung"
# === Zeugnisverfahren ===
HALBJAHRESZEUGNIS = "halbjahreszeugnis"
JAHRESZEUGNIS = "jahreszeugnis"
ABSCHLUSSZEUGNIS = "abschlusszeugnis"
ABGANGSZEUGNIS = "abgangszeugnis"
@classmethod
def is_exam_type(cls, verfahren: str) -> bool:
"""Pruefe ob Verfahren ein Pruefungstyp ist."""
exam_types = {cls.ABITUR, cls.VORABITUR, cls.KLAUSUR, cls.NACHPRUEFUNG}
try:
return cls(verfahren) in exam_types
except ValueError:
return False
@classmethod
def is_certificate_type(cls, verfahren: str) -> bool:
"""Pruefe ob Verfahren ein Zeugnistyp ist."""
cert_types = {cls.HALBJAHRESZEUGNIS, cls.JAHRESZEUGNIS, cls.ABSCHLUSSZEUGNIS, cls.ABGANGSZEUGNIS}
try:
return cls(verfahren) in cert_types
except ValueError:
return False
# =============================================
# DATA STRUCTURES
# =============================================
@dataclass
class PolicySet:
"""
Policy-Konfiguration pro Bundesland/Jahr/Fach.
Ermoeglicht bundesland-spezifische Unterschiede ohne
harte Codierung im Quellcode.
Unterstuetzte Verfahrenstypen:
- Pruefungen: abitur, vorabitur, klausur, nachpruefung
- Zeugnisse: halbjahreszeugnis, jahreszeugnis, abschlusszeugnis, abgangszeugnis
"""
id: str
bundesland: str
jahr: int
fach: Optional[str] # None = gilt fuer alle Faecher
verfahren: str # See VerfahrenType enum
# Sichtbarkeitsregeln (Klausur)
zk_visibility_mode: ZKVisibilityMode = ZKVisibilityMode.FULL
eh_visibility_mode: EHVisibilityMode = EHVisibilityMode.SHARED
# EH-Quellen (Klausur)
allow_teacher_uploaded_eh: bool = True
allow_land_uploaded_eh: bool = True
require_rights_confirmation_on_upload: bool = True
require_dual_control_for_official_eh_update: bool = False
# Korrekturregeln (Klausur)
third_correction_threshold: int = 4 # Notenpunkte Abweichung
final_signoff_role: str = "fachvorsitz"
# Zeugnisregeln (Zeugnis)
require_klassenlehrer_approval: bool = True
require_schulleitung_signoff: bool = True
allow_sekretariat_edit_after_approval: bool = False
konferenz_protokoll_required: bool = True
bemerkungen_require_review: bool = True
fehlzeiten_auto_import: bool = True
kopfnoten_enabled: bool = False
versetzung_auto_calculate: bool = True
# Export & Anzeige
quote_verbatim_allowed: bool = False # Amtliche Texte in UI
export_template_id: str = "default"
# Zusaetzliche Flags
flags: Dict[str, Any] = field(default_factory=dict)
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
def is_exam_policy(self) -> bool:
"""Pruefe ob diese Policy fuer Pruefungen ist."""
return VerfahrenType.is_exam_type(self.verfahren)
def is_certificate_policy(self) -> bool:
"""Pruefe ob diese Policy fuer Zeugnisse ist."""
return VerfahrenType.is_certificate_type(self.verfahren)
def to_dict(self):
d = asdict(self)
d['zk_visibility_mode'] = self.zk_visibility_mode.value
d['eh_visibility_mode'] = self.eh_visibility_mode.value
d['created_at'] = self.created_at.isoformat()
return d
@dataclass
class RoleAssignment:
"""
Zuweisung einer Rolle zu einem User fuer eine spezifische Ressource.
"""
id: str
user_id: str
role: Role
resource_type: ResourceType
resource_id: str
# Optionale Einschraenkungen
tenant_id: Optional[str] = None
namespace_id: Optional[str] = None
# Gueltigkeit
valid_from: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
valid_to: Optional[datetime] = None
# Metadaten
granted_by: str = ""
granted_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
revoked_at: Optional[datetime] = None
def is_active(self) -> bool:
now = datetime.now(timezone.utc)
if self.revoked_at:
return False
if self.valid_to and now > self.valid_to:
return False
return now >= self.valid_from
def to_dict(self):
return {
'id': self.id,
'user_id': self.user_id,
'role': self.role.value,
'resource_type': self.resource_type.value,
'resource_id': self.resource_id,
'tenant_id': self.tenant_id,
'namespace_id': self.namespace_id,
'valid_from': self.valid_from.isoformat(),
'valid_to': self.valid_to.isoformat() if self.valid_to else None,
'granted_by': self.granted_by,
'granted_at': self.granted_at.isoformat(),
'revoked_at': self.revoked_at.isoformat() if self.revoked_at else None,
'is_active': self.is_active()
}
@dataclass
class KeyShare:
"""
Berechtigung fuer einen User, auf verschluesselte Inhalte zuzugreifen.
Ein KeyShare ist KEIN Schluessel im Klartext, sondern eine
Berechtigung in Verbindung mit Role Assignment.
"""
id: str
user_id: str
package_id: str
# Berechtigungsumfang
permissions: Set[str] = field(default_factory=set)
# z.B. {"read_original", "read_eh", "read_ek_outputs", "write_annotations"}
# Optionale Einschraenkungen
scope: str = "full" # "full", "original_only", "eh_only", "outputs_only"
# Kette
granted_by: str = ""
granted_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
# Akzeptanz (fuer Invite-Flow)
invite_token: Optional[str] = None
accepted_at: Optional[datetime] = None
# Widerruf
revoked_at: Optional[datetime] = None
revoked_by: Optional[str] = None
def is_active(self) -> bool:
return self.revoked_at is None and (
self.invite_token is None or self.accepted_at is not None
)
def to_dict(self):
return {
'id': self.id,
'user_id': self.user_id,
'package_id': self.package_id,
'permissions': list(self.permissions),
'scope': self.scope,
'granted_by': self.granted_by,
'granted_at': self.granted_at.isoformat(),
'invite_token': self.invite_token,
'accepted_at': self.accepted_at.isoformat() if self.accepted_at else None,
'revoked_at': self.revoked_at.isoformat() if self.revoked_at else None,
'is_active': self.is_active()
}
@dataclass
class Tenant:
"""
Hoechste Isolationseinheit - typischerweise eine Schule.
"""
id: str
name: str
bundesland: str
tenant_type: str = "school" # "school", "pruefungszentrum", "behoerde"
# Verschluesselung
encryption_enabled: bool = True
# Metadaten
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
deleted_at: Optional[datetime] = None
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'bundesland': self.bundesland,
'tenant_type': self.tenant_type,
'encryption_enabled': self.encryption_enabled,
'created_at': self.created_at.isoformat()
}
@dataclass
class Namespace:
"""
Arbeitsraum innerhalb eines Tenants.
z.B. "Abitur 2026 - Deutsch LK - Kurs 12a"
"""
id: str
tenant_id: str
name: str
# Kontext
jahr: int
fach: str
kurs: Optional[str] = None
pruefungsart: str = "abitur" # "abitur", "vorabitur"
# Policy
policy_set_id: Optional[str] = None
# Metadaten
created_by: str = ""
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
deleted_at: Optional[datetime] = None
def to_dict(self):
return {
'id': self.id,
'tenant_id': self.tenant_id,
'name': self.name,
'jahr': self.jahr,
'fach': self.fach,
'kurs': self.kurs,
'pruefungsart': self.pruefungsart,
'policy_set_id': self.policy_set_id,
'created_by': self.created_by,
'created_at': self.created_at.isoformat()
}
@dataclass
class ExamPackage:
"""
Pruefungspaket - kompletter Satz Arbeiten mit allen Artefakten.
"""
id: str
namespace_id: str
tenant_id: str
name: str
beschreibung: Optional[str] = None
# Workflow-Status
status: str = "draft" # "draft", "in_progress", "locked", "signed_off"
# Beteiligte (Rollen werden separat zugewiesen)
owner_id: str = "" # Typischerweise EK
# Verschluesselung
encryption_key_id: Optional[str] = None
# Timestamps
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
locked_at: Optional[datetime] = None
signed_off_at: Optional[datetime] = None
signed_off_by: Optional[str] = None
def to_dict(self):
return {
'id': self.id,
'namespace_id': self.namespace_id,
'tenant_id': self.tenant_id,
'name': self.name,
'beschreibung': self.beschreibung,
'status': self.status,
'owner_id': self.owner_id,
'created_at': self.created_at.isoformat(),
'locked_at': self.locked_at.isoformat() if self.locked_at else None,
'signed_off_at': self.signed_off_at.isoformat() if self.signed_off_at else None,
'signed_off_by': self.signed_off_by
}