Files
breakpilot-compliance/backend-compliance/consent_client.py
Sharang Parnerkar 3320ef94fc refactor: phase 0 guardrails + phase 1 step 2 (models.py split)
Squash of branch refactor/phase0-guardrails-and-models-split — 4 commits,
81 files, 173/173 pytest green, OpenAPI contract preserved (360 paths /
484 operations).

## Phase 0 — Architecture guardrails

Three defense-in-depth layers to keep the architecture rules enforced
regardless of who opens Claude Code in this repo:

  1. .claude/settings.json PreToolUse hook on Write/Edit blocks any file
     that would exceed the 500-line hard cap. Auto-loads in every Claude
     session in this repo.
  2. scripts/githooks/pre-commit (install via scripts/install-hooks.sh)
     enforces the LOC cap locally, freezes migrations/ without
     [migration-approved], and protects guardrail files without
     [guardrail-change].
  3. .gitea/workflows/ci.yaml gains loc-budget + guardrail-integrity +
     sbom-scan (syft+grype) jobs, adds mypy --strict for the new Python
     packages (compliance/{services,repositories,domain,schemas}), and
     tsc --noEmit for admin-compliance + developer-portal.

Per-language conventions documented in AGENTS.python.md, AGENTS.go.md,
AGENTS.typescript.md at the repo root — layering, tooling, and explicit
"what you may NOT do" lists. Root CLAUDE.md is prepended with the six
non-negotiable rules. Each of the 10 services gets a README.md.

scripts/check-loc.sh enforces soft 300 / hard 500 and surfaces the
current baseline of 205 hard + 161 soft violations so Phases 1-4 can
drain it incrementally. CI gates only CHANGED files in PRs so the
legacy baseline does not block unrelated work.

## Deprecation sweep

47 files. Pydantic V1 regex= -> pattern= (2 sites), class Config ->
ConfigDict in source_policy_router.py (schemas.py intentionally skipped;
it is the Phase 1 Step 3 split target). datetime.utcnow() ->
datetime.now(timezone.utc) everywhere including SQLAlchemy default=
callables. All DB columns already declare timezone=True, so this is a
latent-bug fix at the Python side, not a schema change.

DeprecationWarning count dropped from 158 to 35.

## Phase 1 Step 1 — Contract test harness

tests/contracts/test_openapi_baseline.py diffs the live FastAPI /openapi.json
against tests/contracts/openapi.baseline.json on every test run. Fails on
removed paths, removed status codes, or new required request body fields.
Regenerate only via tests/contracts/regenerate_baseline.py after a
consumer-updated contract change. This is the safety harness for all
subsequent refactor commits.

## Phase 1 Step 2 — models.py split (1466 -> 85 LOC shim)

compliance/db/models.py is decomposed into seven sibling aggregate modules
following the existing repo pattern (dsr_models.py, vvt_models.py, ...):

  regulation_models.py       (134) — Regulation, Requirement
  control_models.py          (279) — Control, Mapping, Evidence, Risk
  ai_system_models.py        (141) — AISystem, AuditExport
  service_module_models.py   (176) — ServiceModule, ModuleRegulation, ModuleRisk
  audit_session_models.py    (177) — AuditSession, AuditSignOff
  isms_governance_models.py  (323) — ISMSScope, Context, Policy, Objective, SoA
  isms_audit_models.py       (468) — Finding, CAPA, MgmtReview, InternalAudit,
                                     AuditTrail, Readiness

models.py becomes an 85-line re-export shim in dependency order so
existing imports continue to work unchanged. Schema is byte-identical:
__tablename__, column definitions, relationship strings, back_populates,
cascade directives all preserved.

All new sibling files are under the 500-line hard cap; largest is
isms_audit_models.py at 468. No file in compliance/db/ now exceeds
the hard cap.

## Phase 1 Step 3 — infrastructure only

backend-compliance/compliance/{schemas,domain,repositories}/ packages
are created as landing zones with docstrings. compliance/domain/
exports DomainError / NotFoundError / ConflictError / ValidationError /
PermissionError — the base classes services will use to raise
domain-level errors instead of HTTPException.

PHASE1_RUNBOOK.md at backend-compliance/PHASE1_RUNBOOK.md documents
the nine-step execution plan for Phase 1: snapshot baseline,
characterization tests, split models.py (this commit), split schemas.py
(next), extract services, extract repositories, mypy --strict, coverage.

## Verification

  backend-compliance/.venv-phase1: uv python install 3.12 + pip -r requirements.txt
  PYTHONPATH=. pytest compliance/tests/ tests/contracts/
  -> 173 passed, 0 failed, 35 warnings, OpenAPI 360/484 unchanged

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 13:18:29 +02:00

360 lines
12 KiB
Python

"""
Consent Service Client für BreakPilot
Kommuniziert mit dem Consent Management Service für GDPR-Compliance
"""
import httpx
import jwt
from datetime import datetime, timedelta, timezone
from typing import Optional, List, Dict, Any
from dataclasses import dataclass
from enum import Enum
import os
import uuid
# Consent Service URL (aus Umgebungsvariable oder Standard für lokale Entwicklung)
CONSENT_SERVICE_URL = os.getenv("CONSENT_SERVICE_URL", "http://localhost:8081")
# JWT Secret - MUSS mit dem Go Consent Service übereinstimmen!
JWT_SECRET = os.getenv("JWT_SECRET", "breakpilot-dev-jwt-secret-2024")
def generate_jwt_token(
user_id: str = None,
email: str = "demo@breakpilot.app",
role: str = "user",
expires_hours: int = 24
) -> str:
"""
Generiert einen JWT Token für die Authentifizierung beim Consent Service.
Args:
user_id: Die User-ID (wird generiert falls nicht angegeben)
email: Die E-Mail-Adresse des Benutzers
role: Die Rolle (user, admin, super_admin)
expires_hours: Gültigkeitsdauer in Stunden
Returns:
JWT Token als String
"""
if user_id is None:
user_id = str(uuid.uuid4())
payload = {
"user_id": user_id,
"email": email,
"role": role,
"exp": datetime.now(timezone.utc) + timedelta(hours=expires_hours),
"iat": datetime.now(timezone.utc),
}
return jwt.encode(payload, JWT_SECRET, algorithm="HS256")
def generate_demo_token() -> str:
"""Generiert einen Demo-Token für nicht-authentifizierte Benutzer"""
return generate_jwt_token(
user_id="demo-user-" + str(uuid.uuid4())[:8],
email="demo@breakpilot.app",
role="user"
)
class DocumentType(str, Enum):
TERMS = "terms"
PRIVACY = "privacy"
COOKIES = "cookies"
COMMUNITY = "community"
@dataclass
class ConsentStatus:
has_consent: bool
current_version_id: Optional[str] = None
consented_version: Optional[str] = None
needs_update: bool = False
consented_at: Optional[str] = None
@dataclass
class DocumentVersion:
id: str
document_id: str
version: str
language: str
title: str
content: str
summary: Optional[str] = None
class ConsentClient:
"""Client für die Kommunikation mit dem Consent Service"""
def __init__(self, base_url: str = CONSENT_SERVICE_URL):
self.base_url = base_url.rstrip("/")
self.api_url = f"{self.base_url}/api/v1"
def _get_headers(self, jwt_token: str) -> Dict[str, str]:
"""Erstellt die Header mit JWT Token"""
return {
"Authorization": f"Bearer {jwt_token}",
"Content-Type": "application/json"
}
async def check_consent(
self,
jwt_token: str,
document_type: DocumentType,
language: str = "de"
) -> ConsentStatus:
"""
Prüft ob der Benutzer dem Dokument zugestimmt hat.
Gibt zurück ob eine Zustimmung vorliegt und ob sie aktuell ist.
"""
async with httpx.AsyncClient() as client:
try:
response = await client.get(
f"{self.api_url}/consent/check/{document_type.value}",
headers=self._get_headers(jwt_token),
params={"language": language},
timeout=10.0
)
if response.status_code == 200:
data = response.json()
return ConsentStatus(
has_consent=data.get("has_consent", False),
current_version_id=data.get("current_version_id"),
consented_version=data.get("consented_version"),
needs_update=data.get("needs_update", False),
consented_at=data.get("consented_at")
)
else:
return ConsentStatus(has_consent=False, needs_update=True)
except httpx.RequestError:
# Bei Verbindungsproblemen: Consent nicht erzwingen
return ConsentStatus(has_consent=True, needs_update=False)
async def check_all_mandatory_consents(
self,
jwt_token: str,
language: str = "de"
) -> Dict[str, ConsentStatus]:
"""
Prüft alle verpflichtenden Dokumente (Terms, Privacy).
Gibt ein Dictionary mit dem Status für jedes Dokument zurück.
"""
mandatory_docs = [DocumentType.TERMS, DocumentType.PRIVACY]
results = {}
for doc_type in mandatory_docs:
results[doc_type.value] = await self.check_consent(jwt_token, doc_type, language)
return results
async def get_pending_consents(
self,
jwt_token: str,
language: str = "de"
) -> List[Dict[str, Any]]:
"""
Gibt eine Liste aller Dokumente zurück, die noch Zustimmung benötigen.
Nützlich für die Anzeige beim Login/Registration.
"""
pending = []
statuses = await self.check_all_mandatory_consents(jwt_token, language)
for doc_type, status in statuses.items():
if not status.has_consent or status.needs_update:
# Hole das aktuelle Dokument
doc = await self.get_latest_document(jwt_token, doc_type, language)
if doc:
pending.append({
"type": doc_type,
"version_id": status.current_version_id,
"title": doc.title,
"content": doc.content,
"summary": doc.summary,
"is_update": status.has_consent and status.needs_update
})
return pending
async def get_latest_document(
self,
jwt_token: str,
document_type: str,
language: str = "de"
) -> Optional[DocumentVersion]:
"""Holt die aktuellste Version eines Dokuments"""
async with httpx.AsyncClient() as client:
try:
response = await client.get(
f"{self.api_url}/documents/{document_type}/latest",
headers=self._get_headers(jwt_token),
params={"language": language},
timeout=10.0
)
if response.status_code == 200:
data = response.json()
return DocumentVersion(
id=data["id"],
document_id=data["document_id"],
version=data["version"],
language=data["language"],
title=data["title"],
content=data["content"],
summary=data.get("summary")
)
return None
except httpx.RequestError:
return None
async def give_consent(
self,
jwt_token: str,
document_type: str,
version_id: str,
consented: bool = True
) -> bool:
"""
Speichert die Zustimmung des Benutzers.
Gibt True zurück bei Erfolg.
"""
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{self.api_url}/consent",
headers=self._get_headers(jwt_token),
json={
"document_type": document_type,
"version_id": version_id,
"consented": consented
},
timeout=10.0
)
return response.status_code == 201
except httpx.RequestError:
return False
async def get_cookie_categories(
self,
jwt_token: str,
language: str = "de"
) -> List[Dict[str, Any]]:
"""Holt alle Cookie-Kategorien für das Cookie-Banner"""
async with httpx.AsyncClient() as client:
try:
response = await client.get(
f"{self.api_url}/cookies/categories",
headers=self._get_headers(jwt_token),
params={"language": language},
timeout=10.0
)
if response.status_code == 200:
return response.json().get("categories", [])
return []
except httpx.RequestError:
return []
async def set_cookie_consent(
self,
jwt_token: str,
categories: List[Dict[str, Any]]
) -> bool:
"""
Speichert die Cookie-Präferenzen.
categories: [{"category_id": "...", "consented": true/false}, ...]
"""
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{self.api_url}/cookies/consent",
headers=self._get_headers(jwt_token),
json={"categories": categories},
timeout=10.0
)
return response.status_code == 200
except httpx.RequestError:
return False
async def get_my_data(self, jwt_token: str) -> Optional[Dict[str, Any]]:
"""GDPR Art. 15: Holt alle Daten des Benutzers"""
async with httpx.AsyncClient() as client:
try:
response = await client.get(
f"{self.api_url}/privacy/my-data",
headers=self._get_headers(jwt_token),
timeout=30.0
)
if response.status_code == 200:
return response.json()
return None
except httpx.RequestError:
return None
async def request_data_export(self, jwt_token: str) -> Optional[str]:
"""GDPR Art. 20: Fordert einen Datenexport an"""
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{self.api_url}/privacy/export",
headers=self._get_headers(jwt_token),
timeout=10.0
)
if response.status_code == 202:
return response.json().get("request_id")
return None
except httpx.RequestError:
return None
async def request_data_deletion(
self,
jwt_token: str,
reason: Optional[str] = None
) -> Optional[str]:
"""GDPR Art. 17: Fordert Löschung aller Daten an"""
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{self.api_url}/privacy/delete",
headers=self._get_headers(jwt_token),
json={"reason": reason} if reason else {},
timeout=10.0
)
if response.status_code == 202:
return response.json().get("request_id")
return None
except httpx.RequestError:
return None
async def health_check(self) -> bool:
"""Prüft ob der Consent Service erreichbar ist"""
async with httpx.AsyncClient() as client:
try:
response = await client.get(
f"{self.base_url}/health",
timeout=5.0
)
return response.status_code == 200
except httpx.RequestError:
return False
# Singleton-Instanz für einfachen Zugriff
consent_client = ConsentClient()