Squash of branch refactor/phase0-guardrails-and-models-split — 4 commits,
81 files, 173/173 pytest green, OpenAPI contract preserved (360 paths /
484 operations).
## Phase 0 — Architecture guardrails
Three defense-in-depth layers to keep the architecture rules enforced
regardless of who opens Claude Code in this repo:
1. .claude/settings.json PreToolUse hook on Write/Edit blocks any file
that would exceed the 500-line hard cap. Auto-loads in every Claude
session in this repo.
2. scripts/githooks/pre-commit (install via scripts/install-hooks.sh)
enforces the LOC cap locally, freezes migrations/ without
[migration-approved], and protects guardrail files without
[guardrail-change].
3. .gitea/workflows/ci.yaml gains loc-budget + guardrail-integrity +
sbom-scan (syft+grype) jobs, adds mypy --strict for the new Python
packages (compliance/{services,repositories,domain,schemas}), and
tsc --noEmit for admin-compliance + developer-portal.
Per-language conventions documented in AGENTS.python.md, AGENTS.go.md,
AGENTS.typescript.md at the repo root — layering, tooling, and explicit
"what you may NOT do" lists. Root CLAUDE.md is prepended with the six
non-negotiable rules. Each of the 10 services gets a README.md.
scripts/check-loc.sh enforces soft 300 / hard 500 and surfaces the
current baseline of 205 hard + 161 soft violations so Phases 1-4 can
drain it incrementally. CI gates only CHANGED files in PRs so the
legacy baseline does not block unrelated work.
## Deprecation sweep
47 files. Pydantic V1 regex= -> pattern= (2 sites), class Config ->
ConfigDict in source_policy_router.py (schemas.py intentionally skipped;
it is the Phase 1 Step 3 split target). datetime.utcnow() ->
datetime.now(timezone.utc) everywhere including SQLAlchemy default=
callables. All DB columns already declare timezone=True, so this is a
latent-bug fix at the Python side, not a schema change.
DeprecationWarning count dropped from 158 to 35.
## Phase 1 Step 1 — Contract test harness
tests/contracts/test_openapi_baseline.py diffs the live FastAPI /openapi.json
against tests/contracts/openapi.baseline.json on every test run. Fails on
removed paths, removed status codes, or new required request body fields.
Regenerate only via tests/contracts/regenerate_baseline.py after a
consumer-updated contract change. This is the safety harness for all
subsequent refactor commits.
## Phase 1 Step 2 — models.py split (1466 -> 85 LOC shim)
compliance/db/models.py is decomposed into seven sibling aggregate modules
following the existing repo pattern (dsr_models.py, vvt_models.py, ...):
regulation_models.py (134) — Regulation, Requirement
control_models.py (279) — Control, Mapping, Evidence, Risk
ai_system_models.py (141) — AISystem, AuditExport
service_module_models.py (176) — ServiceModule, ModuleRegulation, ModuleRisk
audit_session_models.py (177) — AuditSession, AuditSignOff
isms_governance_models.py (323) — ISMSScope, Context, Policy, Objective, SoA
isms_audit_models.py (468) — Finding, CAPA, MgmtReview, InternalAudit,
AuditTrail, Readiness
models.py becomes an 85-line re-export shim in dependency order so
existing imports continue to work unchanged. Schema is byte-identical:
__tablename__, column definitions, relationship strings, back_populates,
cascade directives all preserved.
All new sibling files are under the 500-line hard cap; largest is
isms_audit_models.py at 468. No file in compliance/db/ now exceeds
the hard cap.
## Phase 1 Step 3 — infrastructure only
backend-compliance/compliance/{schemas,domain,repositories}/ packages
are created as landing zones with docstrings. compliance/domain/
exports DomainError / NotFoundError / ConflictError / ValidationError /
PermissionError — the base classes services will use to raise
domain-level errors instead of HTTPException.
PHASE1_RUNBOOK.md at backend-compliance/PHASE1_RUNBOOK.md documents
the nine-step execution plan for Phase 1: snapshot baseline,
characterization tests, split models.py (this commit), split schemas.py
(next), extract services, extract repositories, mypy --strict, coverage.
## Verification
backend-compliance/.venv-phase1: uv python install 3.12 + pip -r requirements.txt
PYTHONPATH=. pytest compliance/tests/ tests/contracts/
-> 173 passed, 0 failed, 35 warnings, OpenAPI 360/484 unchanged
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
360 lines
12 KiB
Python
360 lines
12 KiB
Python
"""
|
|
Consent Service Client für BreakPilot
|
|
Kommuniziert mit dem Consent Management Service für GDPR-Compliance
|
|
"""
|
|
|
|
import httpx
|
|
import jwt
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional, List, Dict, Any
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
import os
|
|
import uuid
|
|
|
|
# Consent Service URL (aus Umgebungsvariable oder Standard für lokale Entwicklung)
|
|
CONSENT_SERVICE_URL = os.getenv("CONSENT_SERVICE_URL", "http://localhost:8081")
|
|
|
|
# JWT Secret - MUSS mit dem Go Consent Service übereinstimmen!
|
|
JWT_SECRET = os.getenv("JWT_SECRET", "breakpilot-dev-jwt-secret-2024")
|
|
|
|
|
|
def generate_jwt_token(
|
|
user_id: str = None,
|
|
email: str = "demo@breakpilot.app",
|
|
role: str = "user",
|
|
expires_hours: int = 24
|
|
) -> str:
|
|
"""
|
|
Generiert einen JWT Token für die Authentifizierung beim Consent Service.
|
|
|
|
Args:
|
|
user_id: Die User-ID (wird generiert falls nicht angegeben)
|
|
email: Die E-Mail-Adresse des Benutzers
|
|
role: Die Rolle (user, admin, super_admin)
|
|
expires_hours: Gültigkeitsdauer in Stunden
|
|
|
|
Returns:
|
|
JWT Token als String
|
|
"""
|
|
if user_id is None:
|
|
user_id = str(uuid.uuid4())
|
|
|
|
payload = {
|
|
"user_id": user_id,
|
|
"email": email,
|
|
"role": role,
|
|
"exp": datetime.now(timezone.utc) + timedelta(hours=expires_hours),
|
|
"iat": datetime.now(timezone.utc),
|
|
}
|
|
|
|
return jwt.encode(payload, JWT_SECRET, algorithm="HS256")
|
|
|
|
|
|
def generate_demo_token() -> str:
|
|
"""Generiert einen Demo-Token für nicht-authentifizierte Benutzer"""
|
|
return generate_jwt_token(
|
|
user_id="demo-user-" + str(uuid.uuid4())[:8],
|
|
email="demo@breakpilot.app",
|
|
role="user"
|
|
)
|
|
|
|
|
|
class DocumentType(str, Enum):
|
|
TERMS = "terms"
|
|
PRIVACY = "privacy"
|
|
COOKIES = "cookies"
|
|
COMMUNITY = "community"
|
|
|
|
|
|
@dataclass
|
|
class ConsentStatus:
|
|
has_consent: bool
|
|
current_version_id: Optional[str] = None
|
|
consented_version: Optional[str] = None
|
|
needs_update: bool = False
|
|
consented_at: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class DocumentVersion:
|
|
id: str
|
|
document_id: str
|
|
version: str
|
|
language: str
|
|
title: str
|
|
content: str
|
|
summary: Optional[str] = None
|
|
|
|
|
|
class ConsentClient:
|
|
"""Client für die Kommunikation mit dem Consent Service"""
|
|
|
|
def __init__(self, base_url: str = CONSENT_SERVICE_URL):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.api_url = f"{self.base_url}/api/v1"
|
|
|
|
def _get_headers(self, jwt_token: str) -> Dict[str, str]:
|
|
"""Erstellt die Header mit JWT Token"""
|
|
return {
|
|
"Authorization": f"Bearer {jwt_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
async def check_consent(
|
|
self,
|
|
jwt_token: str,
|
|
document_type: DocumentType,
|
|
language: str = "de"
|
|
) -> ConsentStatus:
|
|
"""
|
|
Prüft ob der Benutzer dem Dokument zugestimmt hat.
|
|
Gibt zurück ob eine Zustimmung vorliegt und ob sie aktuell ist.
|
|
"""
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
response = await client.get(
|
|
f"{self.api_url}/consent/check/{document_type.value}",
|
|
headers=self._get_headers(jwt_token),
|
|
params={"language": language},
|
|
timeout=10.0
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
return ConsentStatus(
|
|
has_consent=data.get("has_consent", False),
|
|
current_version_id=data.get("current_version_id"),
|
|
consented_version=data.get("consented_version"),
|
|
needs_update=data.get("needs_update", False),
|
|
consented_at=data.get("consented_at")
|
|
)
|
|
else:
|
|
return ConsentStatus(has_consent=False, needs_update=True)
|
|
|
|
except httpx.RequestError:
|
|
# Bei Verbindungsproblemen: Consent nicht erzwingen
|
|
return ConsentStatus(has_consent=True, needs_update=False)
|
|
|
|
async def check_all_mandatory_consents(
|
|
self,
|
|
jwt_token: str,
|
|
language: str = "de"
|
|
) -> Dict[str, ConsentStatus]:
|
|
"""
|
|
Prüft alle verpflichtenden Dokumente (Terms, Privacy).
|
|
Gibt ein Dictionary mit dem Status für jedes Dokument zurück.
|
|
"""
|
|
mandatory_docs = [DocumentType.TERMS, DocumentType.PRIVACY]
|
|
results = {}
|
|
|
|
for doc_type in mandatory_docs:
|
|
results[doc_type.value] = await self.check_consent(jwt_token, doc_type, language)
|
|
|
|
return results
|
|
|
|
async def get_pending_consents(
|
|
self,
|
|
jwt_token: str,
|
|
language: str = "de"
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Gibt eine Liste aller Dokumente zurück, die noch Zustimmung benötigen.
|
|
Nützlich für die Anzeige beim Login/Registration.
|
|
"""
|
|
pending = []
|
|
statuses = await self.check_all_mandatory_consents(jwt_token, language)
|
|
|
|
for doc_type, status in statuses.items():
|
|
if not status.has_consent or status.needs_update:
|
|
# Hole das aktuelle Dokument
|
|
doc = await self.get_latest_document(jwt_token, doc_type, language)
|
|
if doc:
|
|
pending.append({
|
|
"type": doc_type,
|
|
"version_id": status.current_version_id,
|
|
"title": doc.title,
|
|
"content": doc.content,
|
|
"summary": doc.summary,
|
|
"is_update": status.has_consent and status.needs_update
|
|
})
|
|
|
|
return pending
|
|
|
|
async def get_latest_document(
|
|
self,
|
|
jwt_token: str,
|
|
document_type: str,
|
|
language: str = "de"
|
|
) -> Optional[DocumentVersion]:
|
|
"""Holt die aktuellste Version eines Dokuments"""
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
response = await client.get(
|
|
f"{self.api_url}/documents/{document_type}/latest",
|
|
headers=self._get_headers(jwt_token),
|
|
params={"language": language},
|
|
timeout=10.0
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
return DocumentVersion(
|
|
id=data["id"],
|
|
document_id=data["document_id"],
|
|
version=data["version"],
|
|
language=data["language"],
|
|
title=data["title"],
|
|
content=data["content"],
|
|
summary=data.get("summary")
|
|
)
|
|
return None
|
|
|
|
except httpx.RequestError:
|
|
return None
|
|
|
|
async def give_consent(
|
|
self,
|
|
jwt_token: str,
|
|
document_type: str,
|
|
version_id: str,
|
|
consented: bool = True
|
|
) -> bool:
|
|
"""
|
|
Speichert die Zustimmung des Benutzers.
|
|
Gibt True zurück bei Erfolg.
|
|
"""
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
response = await client.post(
|
|
f"{self.api_url}/consent",
|
|
headers=self._get_headers(jwt_token),
|
|
json={
|
|
"document_type": document_type,
|
|
"version_id": version_id,
|
|
"consented": consented
|
|
},
|
|
timeout=10.0
|
|
)
|
|
return response.status_code == 201
|
|
|
|
except httpx.RequestError:
|
|
return False
|
|
|
|
async def get_cookie_categories(
|
|
self,
|
|
jwt_token: str,
|
|
language: str = "de"
|
|
) -> List[Dict[str, Any]]:
|
|
"""Holt alle Cookie-Kategorien für das Cookie-Banner"""
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
response = await client.get(
|
|
f"{self.api_url}/cookies/categories",
|
|
headers=self._get_headers(jwt_token),
|
|
params={"language": language},
|
|
timeout=10.0
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
return response.json().get("categories", [])
|
|
return []
|
|
|
|
except httpx.RequestError:
|
|
return []
|
|
|
|
async def set_cookie_consent(
|
|
self,
|
|
jwt_token: str,
|
|
categories: List[Dict[str, Any]]
|
|
) -> bool:
|
|
"""
|
|
Speichert die Cookie-Präferenzen.
|
|
categories: [{"category_id": "...", "consented": true/false}, ...]
|
|
"""
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
response = await client.post(
|
|
f"{self.api_url}/cookies/consent",
|
|
headers=self._get_headers(jwt_token),
|
|
json={"categories": categories},
|
|
timeout=10.0
|
|
)
|
|
return response.status_code == 200
|
|
|
|
except httpx.RequestError:
|
|
return False
|
|
|
|
async def get_my_data(self, jwt_token: str) -> Optional[Dict[str, Any]]:
|
|
"""GDPR Art. 15: Holt alle Daten des Benutzers"""
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
response = await client.get(
|
|
f"{self.api_url}/privacy/my-data",
|
|
headers=self._get_headers(jwt_token),
|
|
timeout=30.0
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
return None
|
|
|
|
except httpx.RequestError:
|
|
return None
|
|
|
|
async def request_data_export(self, jwt_token: str) -> Optional[str]:
|
|
"""GDPR Art. 20: Fordert einen Datenexport an"""
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
response = await client.post(
|
|
f"{self.api_url}/privacy/export",
|
|
headers=self._get_headers(jwt_token),
|
|
timeout=10.0
|
|
)
|
|
|
|
if response.status_code == 202:
|
|
return response.json().get("request_id")
|
|
return None
|
|
|
|
except httpx.RequestError:
|
|
return None
|
|
|
|
async def request_data_deletion(
|
|
self,
|
|
jwt_token: str,
|
|
reason: Optional[str] = None
|
|
) -> Optional[str]:
|
|
"""GDPR Art. 17: Fordert Löschung aller Daten an"""
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
response = await client.post(
|
|
f"{self.api_url}/privacy/delete",
|
|
headers=self._get_headers(jwt_token),
|
|
json={"reason": reason} if reason else {},
|
|
timeout=10.0
|
|
)
|
|
|
|
if response.status_code == 202:
|
|
return response.json().get("request_id")
|
|
return None
|
|
|
|
except httpx.RequestError:
|
|
return None
|
|
|
|
async def health_check(self) -> bool:
|
|
"""Prüft ob der Consent Service erreichbar ist"""
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
response = await client.get(
|
|
f"{self.base_url}/health",
|
|
timeout=5.0
|
|
)
|
|
return response.status_code == 200
|
|
|
|
except httpx.RequestError:
|
|
return False
|
|
|
|
|
|
# Singleton-Instanz für einfachen Zugriff
|
|
consent_client = ConsentClient()
|