feat: add RAG corpus versioning and source policy backend
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 34s
CI / test-python-backend-compliance (push) Successful in 32s
CI / test-python-document-crawler (push) Successful in 23s
CI / test-python-dsms-gateway (push) Successful in 18s

Part 1 — RAG Corpus Versioning:
- New DB table compliance_corpus_versions (migration 017)
- Go CorpusVersionStore with CRUD operations
- Assessment struct extended with corpus_version_id
- API endpoints: GET /rag/corpus-status, /rag/corpus-versions/:collection
- RAG routes (search, regulations) now registered in main.go
- Ingestion script registers corpus versions after each run
- Frontend staleness badge in SDK sidebar

Part 3 — Source Policy Backend:
- New FastAPI router with CRUD for allowed sources, PII rules,
  operations matrix, audit trail, stats, and compliance report
- SQLAlchemy models for all source policy tables (migration 001)
- Frontend API base corrected from edu-search:8088/8089 to
  backend-compliance:8002/api

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-02 07:58:08 +01:00
parent 187dbf1b77
commit a228b3b528
15 changed files with 2020 additions and 11 deletions

View File

@@ -0,0 +1,503 @@
"""
Source Policy Router — Manages allowed compliance data sources.
Controls which legal sources the RAG corpus may use,
operations matrix, PII rules, and provides audit trail.
Endpoints:
GET /api/v1/admin/sources — List all sources
POST /api/v1/admin/sources — Add new source
GET /api/v1/admin/sources/{id} — Get source by ID
PUT /api/v1/admin/sources/{id} — Update source
DELETE /api/v1/admin/sources/{id} — Remove source
GET /api/v1/admin/operations-matrix — Operations matrix
PUT /api/v1/admin/operations/{id} — Update operation
GET /api/v1/admin/pii-rules — List PII rules
POST /api/v1/admin/pii-rules — Create PII rule
PUT /api/v1/admin/pii-rules/{id} — Update PII rule
DELETE /api/v1/admin/pii-rules/{id} — Delete PII rule
GET /api/v1/admin/policy-audit — Audit trail
GET /api/v1/admin/policy-stats — Dashboard statistics
GET /api/v1/admin/compliance-report — Compliance report
"""
import uuid
from datetime import datetime
from typing import Optional, List
from fastapi import APIRouter, HTTPException, Depends, Query
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from database import get_db
from compliance.db.source_policy_models import (
AllowedSourceDB,
SourceOperationDB,
PIIRuleDB,
SourcePolicyAuditDB,
)
router = APIRouter(prefix="/v1/admin", tags=["source-policy"])
# =============================================================================
# Pydantic Schemas
# =============================================================================
class SourceCreate(BaseModel):
domain: str
name: str
description: Optional[str] = None
license: Optional[str] = None
legal_basis: Optional[str] = None
trust_boost: float = Field(default=0.5, ge=0.0, le=1.0)
source_type: str = "legal"
active: bool = True
metadata: Optional[dict] = None
class SourceUpdate(BaseModel):
domain: Optional[str] = None
name: Optional[str] = None
description: Optional[str] = None
license: Optional[str] = None
legal_basis: Optional[str] = None
trust_boost: Optional[float] = Field(default=None, ge=0.0, le=1.0)
source_type: Optional[str] = None
active: Optional[bool] = None
metadata: Optional[dict] = None
class SourceResponse(BaseModel):
id: str
domain: str
name: str
description: Optional[str] = None
license: Optional[str] = None
legal_basis: Optional[str] = None
trust_boost: float
source_type: str
active: bool
metadata: Optional[dict] = None
created_at: str
updated_at: Optional[str] = None
class Config:
from_attributes = True
class OperationUpdate(BaseModel):
allowed: bool
conditions: Optional[str] = None
class PIIRuleCreate(BaseModel):
name: str
description: Optional[str] = None
pattern: Optional[str] = None
category: str
action: str = "mask"
active: bool = True
class PIIRuleUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
pattern: Optional[str] = None
category: Optional[str] = None
action: Optional[str] = None
active: Optional[bool] = None
# =============================================================================
# Helper: Audit logging
# =============================================================================
def _log_audit(db: Session, action: str, entity_type: str, entity_id, old_values=None, new_values=None):
audit = SourcePolicyAuditDB(
action=action,
entity_type=entity_type,
entity_id=entity_id,
old_values=old_values,
new_values=new_values,
user_id="system",
)
db.add(audit)
def _source_to_dict(source: AllowedSourceDB) -> dict:
return {
"id": str(source.id),
"domain": source.domain,
"name": source.name,
"description": source.description,
"license": source.license,
"legal_basis": source.legal_basis,
"trust_boost": source.trust_boost,
"source_type": source.source_type,
"active": source.active,
}
# =============================================================================
# Sources CRUD
# =============================================================================
@router.get("/sources")
async def list_sources(
active_only: bool = Query(False),
db: Session = Depends(get_db),
):
"""List all allowed sources."""
query = db.query(AllowedSourceDB)
if active_only:
query = query.filter(AllowedSourceDB.active == True)
sources = query.order_by(AllowedSourceDB.name).all()
return {
"sources": [
{
"id": str(s.id),
"domain": s.domain,
"name": s.name,
"description": s.description,
"license": s.license,
"legal_basis": s.legal_basis,
"trust_boost": s.trust_boost,
"source_type": s.source_type,
"active": s.active,
"metadata": s.metadata_,
"created_at": s.created_at.isoformat() if s.created_at else None,
"updated_at": s.updated_at.isoformat() if s.updated_at else None,
}
for s in sources
],
"count": len(sources),
}
@router.post("/sources")
async def create_source(
data: SourceCreate,
db: Session = Depends(get_db),
):
"""Add a new allowed source."""
existing = db.query(AllowedSourceDB).filter(AllowedSourceDB.domain == data.domain).first()
if existing:
raise HTTPException(status_code=409, detail=f"Source with domain '{data.domain}' already exists")
source = AllowedSourceDB(
domain=data.domain,
name=data.name,
description=data.description,
license=data.license,
legal_basis=data.legal_basis,
trust_boost=data.trust_boost,
source_type=data.source_type,
active=data.active,
metadata_=data.metadata,
)
db.add(source)
_log_audit(db, "create", "source", source.id, new_values=_source_to_dict(source))
db.commit()
db.refresh(source)
return {
"id": str(source.id),
"domain": source.domain,
"name": source.name,
"created_at": source.created_at.isoformat(),
}
@router.get("/sources/{source_id}")
async def get_source(source_id: str, db: Session = Depends(get_db)):
"""Get a specific source."""
source = db.query(AllowedSourceDB).filter(AllowedSourceDB.id == source_id).first()
if not source:
raise HTTPException(status_code=404, detail="Source not found")
return {
"id": str(source.id),
"domain": source.domain,
"name": source.name,
"description": source.description,
"license": source.license,
"legal_basis": source.legal_basis,
"trust_boost": source.trust_boost,
"source_type": source.source_type,
"active": source.active,
"metadata": source.metadata_,
"created_at": source.created_at.isoformat() if source.created_at else None,
"updated_at": source.updated_at.isoformat() if source.updated_at else None,
}
@router.put("/sources/{source_id}")
async def update_source(
source_id: str,
data: SourceUpdate,
db: Session = Depends(get_db),
):
"""Update an existing source."""
source = db.query(AllowedSourceDB).filter(AllowedSourceDB.id == source_id).first()
if not source:
raise HTTPException(status_code=404, detail="Source not found")
old_values = _source_to_dict(source)
update_data = data.model_dump(exclude_unset=True)
# Rename metadata to metadata_ for the DB column
if "metadata" in update_data:
update_data["metadata_"] = update_data.pop("metadata")
for key, value in update_data.items():
setattr(source, key, value)
_log_audit(db, "update", "source", source.id, old_values=old_values, new_values=update_data)
db.commit()
db.refresh(source)
return {"status": "updated", "id": str(source.id)}
@router.delete("/sources/{source_id}")
async def delete_source(source_id: str, db: Session = Depends(get_db)):
"""Remove an allowed source."""
source = db.query(AllowedSourceDB).filter(AllowedSourceDB.id == source_id).first()
if not source:
raise HTTPException(status_code=404, detail="Source not found")
old_values = _source_to_dict(source)
_log_audit(db, "delete", "source", source.id, old_values=old_values)
# Also delete associated operations
db.query(SourceOperationDB).filter(SourceOperationDB.source_id == source_id).delete()
db.delete(source)
db.commit()
return {"status": "deleted", "id": source_id}
# =============================================================================
# Operations Matrix
# =============================================================================
@router.get("/operations-matrix")
async def get_operations_matrix(db: Session = Depends(get_db)):
"""Get the full operations matrix."""
operations = db.query(SourceOperationDB).all()
return {
"operations": [
{
"id": str(op.id),
"source_id": str(op.source_id),
"operation": op.operation,
"allowed": op.allowed,
"conditions": op.conditions,
}
for op in operations
],
"count": len(operations),
}
@router.put("/operations/{operation_id}")
async def update_operation(
operation_id: str,
data: OperationUpdate,
db: Session = Depends(get_db),
):
"""Update an operation in the matrix."""
op = db.query(SourceOperationDB).filter(SourceOperationDB.id == operation_id).first()
if not op:
raise HTTPException(status_code=404, detail="Operation not found")
op.allowed = data.allowed
if data.conditions is not None:
op.conditions = data.conditions
_log_audit(db, "update", "operation", op.id, new_values={"allowed": data.allowed})
db.commit()
return {"status": "updated", "id": str(op.id)}
# =============================================================================
# PII Rules
# =============================================================================
@router.get("/pii-rules")
async def list_pii_rules(db: Session = Depends(get_db)):
"""List all PII rules."""
rules = db.query(PIIRuleDB).order_by(PIIRuleDB.category, PIIRuleDB.name).all()
return {
"rules": [
{
"id": str(r.id),
"name": r.name,
"description": r.description,
"pattern": r.pattern,
"category": r.category,
"action": r.action,
"active": r.active,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
for r in rules
],
"count": len(rules),
}
@router.post("/pii-rules")
async def create_pii_rule(data: PIIRuleCreate, db: Session = Depends(get_db)):
"""Create a new PII rule."""
rule = PIIRuleDB(
name=data.name,
description=data.description,
pattern=data.pattern,
category=data.category,
action=data.action,
active=data.active,
)
db.add(rule)
_log_audit(db, "create", "pii_rule", rule.id, new_values={"name": data.name, "category": data.category})
db.commit()
db.refresh(rule)
return {"id": str(rule.id), "name": rule.name}
@router.put("/pii-rules/{rule_id}")
async def update_pii_rule(rule_id: str, data: PIIRuleUpdate, db: Session = Depends(get_db)):
"""Update a PII rule."""
rule = db.query(PIIRuleDB).filter(PIIRuleDB.id == rule_id).first()
if not rule:
raise HTTPException(status_code=404, detail="PII rule not found")
update_data = data.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(rule, key, value)
_log_audit(db, "update", "pii_rule", rule.id, new_values=update_data)
db.commit()
return {"status": "updated", "id": str(rule.id)}
@router.delete("/pii-rules/{rule_id}")
async def delete_pii_rule(rule_id: str, db: Session = Depends(get_db)):
"""Delete a PII rule."""
rule = db.query(PIIRuleDB).filter(PIIRuleDB.id == rule_id).first()
if not rule:
raise HTTPException(status_code=404, detail="PII rule not found")
_log_audit(db, "delete", "pii_rule", rule.id, old_values={"name": rule.name, "category": rule.category})
db.delete(rule)
db.commit()
return {"status": "deleted", "id": rule_id}
# =============================================================================
# Audit Trail
# =============================================================================
@router.get("/policy-audit")
async def get_policy_audit(
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
entity_type: Optional[str] = None,
db: Session = Depends(get_db),
):
"""Get the audit trail for source policy changes."""
query = db.query(SourcePolicyAuditDB)
if entity_type:
query = query.filter(SourcePolicyAuditDB.entity_type == entity_type)
total = query.count()
entries = query.order_by(SourcePolicyAuditDB.created_at.desc()).offset(offset).limit(limit).all()
return {
"entries": [
{
"id": str(e.id),
"action": e.action,
"entity_type": e.entity_type,
"entity_id": str(e.entity_id) if e.entity_id else None,
"old_values": e.old_values,
"new_values": e.new_values,
"user_id": e.user_id,
"created_at": e.created_at.isoformat() if e.created_at else None,
}
for e in entries
],
"total": total,
"limit": limit,
"offset": offset,
}
# =============================================================================
# Dashboard Statistics
# =============================================================================
@router.get("/policy-stats")
async def get_policy_stats(db: Session = Depends(get_db)):
"""Get dashboard statistics for source policy."""
total_sources = db.query(AllowedSourceDB).count()
active_sources = db.query(AllowedSourceDB).filter(AllowedSourceDB.active == True).count()
pii_rules = db.query(PIIRuleDB).filter(PIIRuleDB.active == True).count()
# Count audit entries from today
today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
blocked_today = db.query(SourcePolicyAuditDB).filter(
SourcePolicyAuditDB.action == "delete",
SourcePolicyAuditDB.created_at >= today_start,
).count()
blocked_total = db.query(SourcePolicyAuditDB).filter(
SourcePolicyAuditDB.action == "delete",
).count()
return {
"active_policies": active_sources,
"allowed_sources": total_sources,
"pii_rules": pii_rules,
"blocked_today": blocked_today,
"blocked_total": blocked_total,
}
@router.get("/compliance-report")
async def get_compliance_report(db: Session = Depends(get_db)):
"""Generate a compliance report for source policies."""
sources = db.query(AllowedSourceDB).filter(AllowedSourceDB.active == True).all()
pii_rules = db.query(PIIRuleDB).filter(PIIRuleDB.active == True).all()
return {
"report_date": datetime.utcnow().isoformat(),
"summary": {
"active_sources": len(sources),
"active_pii_rules": len(pii_rules),
"source_types": list(set(s.source_type for s in sources)),
"licenses": list(set(s.license for s in sources if s.license)),
},
"sources": [
{
"domain": s.domain,
"name": s.name,
"license": s.license,
"legal_basis": s.legal_basis,
"trust_boost": s.trust_boost,
}
for s in sources
],
"pii_rules": [
{
"name": r.name,
"category": r.category,
"action": r.action,
}
for r in pii_rules
],
}

View File

@@ -0,0 +1,105 @@
"""
SQLAlchemy models for Source Policy Management.
Tables:
- compliance_allowed_sources: Whitelisted data sources for RAG corpus
- compliance_source_operations: Operations matrix for source data flows
- compliance_pii_rules: PII detection/masking rules for sources
- compliance_source_policy_audit: Audit trail for source policy changes
"""
import uuid
from datetime import datetime
from sqlalchemy import (
Column, String, Text, Boolean, DateTime, Float, JSON, Index
)
from sqlalchemy.dialects.postgresql import UUID
from classroom_engine.database import Base
class AllowedSourceDB(Base):
"""Whitelisted data source for compliance RAG corpus."""
__tablename__ = 'compliance_allowed_sources'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
domain = Column(String(255), unique=True, nullable=False)
name = Column(String(255), nullable=False)
description = Column(Text, nullable=True)
license = Column(String(100), nullable=True) # DL-DE-BY-2.0, CC-BY, etc.
legal_basis = Column(String(200), nullable=True) # §5 UrhG, etc.
trust_boost = Column(Float, default=0.5)
source_type = Column(String(50), default='legal') # legal, guidance, template
active = Column(Boolean, default=True)
metadata_ = Column('metadata', JSON, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, onupdate=datetime.utcnow, nullable=True)
__table_args__ = (
Index('idx_allowed_sources_domain', 'domain'),
Index('idx_allowed_sources_active', 'active'),
)
def __repr__(self):
return f"<AllowedSource {self.domain}: {self.name}>"
class SourceOperationDB(Base):
"""Operations matrix entry for source data flows."""
__tablename__ = 'compliance_source_operations'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
source_id = Column(UUID(as_uuid=True), nullable=False)
operation = Column(String(50), nullable=False) # ingest, search, export, share
allowed = Column(Boolean, default=True)
conditions = Column(Text, nullable=True) # Conditions for this operation
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, onupdate=datetime.utcnow, nullable=True)
__table_args__ = (
Index('idx_source_operations_source', 'source_id'),
)
class PIIRuleDB(Base):
"""PII detection and masking rule for compliance sources."""
__tablename__ = 'compliance_pii_rules'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String(255), nullable=False)
description = Column(Text, nullable=True)
pattern = Column(Text, nullable=True) # Regex pattern
category = Column(String(50), nullable=False) # email, phone, name, address, etc.
action = Column(String(20), default='mask') # mask, redact, flag
active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, onupdate=datetime.utcnow, nullable=True)
__table_args__ = (
Index('idx_pii_rules_category', 'category'),
Index('idx_pii_rules_active', 'active'),
)
class SourcePolicyAuditDB(Base):
"""Audit trail for source policy changes."""
__tablename__ = 'compliance_source_policy_audit'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
action = Column(String(20), nullable=False) # create, update, delete
entity_type = Column(String(50), nullable=False) # source, operation, pii_rule
entity_id = Column(UUID(as_uuid=True), nullable=True)
old_values = Column(JSON, nullable=True)
new_values = Column(JSON, nullable=True)
user_id = Column(String(100), nullable=True)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
__table_args__ = (
Index('idx_source_audit_entity', 'entity_type', 'entity_id'),
Index('idx_source_audit_created', 'created_at'),
)

View File

@@ -21,6 +21,9 @@ from dsr_admin_api import router as dsr_admin_router, templates_router as dsr_te
# Compliance framework sub-package
from compliance.api import router as compliance_framework_router
# Source Policy
from compliance.api.source_policy_router import router as source_policy_router
# Middleware
from middleware import (
RequestIDMiddleware,
@@ -85,6 +88,9 @@ app.include_router(dsr_templates_router, prefix="/api")
# Compliance Framework (regulations, controls, evidence, risks, audits, ISMS)
app.include_router(compliance_framework_router, prefix="/api")
# Source Policy (allowed sources, PII rules, audit)
app.include_router(source_policy_router, prefix="/api")
if __name__ == "__main__":
import uvicorn

View File

@@ -0,0 +1,73 @@
-- =============================================================================
-- Migration 001: Source Policy Tables
--
-- Tables for managing allowed compliance data sources, operations matrix,
-- PII rules, and audit trail.
-- =============================================================================
CREATE TABLE IF NOT EXISTS compliance_allowed_sources (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
domain VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(255) NOT NULL,
description TEXT,
license VARCHAR(100),
legal_basis VARCHAR(200),
trust_boost FLOAT DEFAULT 0.5,
source_type VARCHAR(50) DEFAULT 'legal',
active BOOLEAN DEFAULT true,
metadata JSON,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_allowed_sources_domain ON compliance_allowed_sources(domain);
CREATE INDEX IF NOT EXISTS idx_allowed_sources_active ON compliance_allowed_sources(active);
CREATE TABLE IF NOT EXISTS compliance_source_operations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_id UUID NOT NULL REFERENCES compliance_allowed_sources(id) ON DELETE CASCADE,
operation VARCHAR(50) NOT NULL,
allowed BOOLEAN DEFAULT true,
conditions TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_source_operations_source ON compliance_source_operations(source_id);
CREATE TABLE IF NOT EXISTS compliance_pii_rules (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
description TEXT,
pattern TEXT,
category VARCHAR(50) NOT NULL,
action VARCHAR(20) DEFAULT 'mask',
active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_pii_rules_category ON compliance_pii_rules(category);
CREATE INDEX IF NOT EXISTS idx_pii_rules_active ON compliance_pii_rules(active);
CREATE TABLE IF NOT EXISTS compliance_source_policy_audit (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
action VARCHAR(20) NOT NULL,
entity_type VARCHAR(50) NOT NULL,
entity_id UUID,
old_values JSON,
new_values JSON,
user_id VARCHAR(100),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_source_audit_entity ON compliance_source_policy_audit(entity_type, entity_id);
CREATE INDEX IF NOT EXISTS idx_source_audit_created ON compliance_source_policy_audit(created_at);
-- Seed default PII rules
INSERT INTO compliance_pii_rules (name, category, pattern, action, description) VALUES
('E-Mail-Adresse', 'email', '[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', 'mask', 'E-Mail-Adressen erkennen und maskieren'),
('Telefonnummer', 'phone', '(\+49|0)[0-9\s/-]{8,15}', 'mask', 'Deutsche Telefonnummern erkennen'),
('IBAN', 'financial', 'DE[0-9]{2}\s?[0-9]{4}\s?[0-9]{4}\s?[0-9]{4}\s?[0-9]{4}\s?[0-9]{2}', 'redact', 'Deutsche IBAN-Nummern erkennen und entfernen'),
('Postadresse', 'address', '[0-9]{5}\s+[A-Z][a-z]', 'flag', 'Postleitzahlen mit Ortsnamen markieren')
ON CONFLICT DO NOTHING;