Files
Benjamin Admin e7fab73a3a
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Failing after 36s
CI / test-python-backend-compliance (push) Successful in 35s
CI / test-python-document-crawler (push) Successful in 27s
CI / test-python-dsms-gateway (push) Successful in 21s
fix(company-profile): Projekt-aware Persistenz — Daten werden jetzt pro Projekt gespeichert
Problem: Company Profile nutzte hartcodiertes tenant_id=default ohne project_id.
Beim Wechsel zwischen Projekten wurden immer die gleichen (oder keine) Daten geladen.

Aenderungen:
- Migration 042: project_id Spalte + UNIQUE(tenant_id, project_id) Constraint,
  fehlende Spalten (offering_urls, Adressfelder) nachgetragen
- Backend: Alle Queries nutzen WHERE tenant_id + project_id IS NOT DISTINCT FROM
- Proxy: project_id Query-Parameter wird durchgereicht
- Frontend: projectId aus SDK-Context, profileApiUrl() Helper fuer alle API-Aufrufe
- "Weiter" speichert jetzt immer den Draft (war schon so, ging aber ins Leere)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 23:48:15 +01:00

641 lines
26 KiB
Python

"""
FastAPI routes for Company Profile CRUD with audit logging.
Endpoints:
- GET /v1/company-profile: Get company profile for a tenant (+project)
- POST /v1/company-profile: Create or update company profile
- DELETE /v1/company-profile: Delete company profile
- GET /v1/company-profile/audit: Get audit log for a tenant
- GET /v1/company-profile/template-context: Flat dict for template substitution
"""
import json
import logging
from typing import Optional
from fastapi import APIRouter, HTTPException, Header, Query
from pydantic import BaseModel
from sqlalchemy import text
from database import SessionLocal
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/v1/company-profile", tags=["company-profile"])
# =============================================================================
# REQUEST/RESPONSE MODELS
# =============================================================================
class CompanyProfileRequest(BaseModel):
company_name: str = ""
legal_form: str = "GmbH"
industry: str = ""
founded_year: Optional[int] = None
business_model: str = "B2B"
offerings: list[str] = []
offering_urls: dict = {}
company_size: str = "small"
employee_count: str = "1-9"
annual_revenue: str = "< 2 Mio"
headquarters_country: str = "DE"
headquarters_country_other: str = ""
headquarters_street: str = ""
headquarters_zip: str = ""
headquarters_city: str = ""
headquarters_state: str = ""
has_international_locations: bool = False
international_countries: list[str] = []
target_markets: list[str] = ["DE"]
primary_jurisdiction: str = "DE"
is_data_controller: bool = True
is_data_processor: bool = False
uses_ai: bool = False
ai_use_cases: list[str] = []
dpo_name: Optional[str] = None
dpo_email: Optional[str] = None
legal_contact_name: Optional[str] = None
legal_contact_email: Optional[str] = None
machine_builder: Optional[dict] = None
is_complete: bool = False
# Phase 2 fields
repos: list[dict] = []
document_sources: list[dict] = []
processing_systems: list[dict] = []
ai_systems: list[dict] = []
technical_contacts: list[dict] = []
subject_to_nis2: bool = False
subject_to_ai_act: bool = False
subject_to_iso27001: bool = False
supervisory_authority: Optional[str] = None
review_cycle_months: int = 12
# Project ID (multi-project)
project_id: Optional[str] = None
class CompanyProfileResponse(BaseModel):
id: str
tenant_id: str
project_id: Optional[str] = None
company_name: str
legal_form: str
industry: str
founded_year: Optional[int]
business_model: str
offerings: list[str]
offering_urls: dict = {}
company_size: str
employee_count: str
annual_revenue: str
headquarters_country: str
headquarters_country_other: str = ""
headquarters_street: str = ""
headquarters_zip: str = ""
headquarters_city: str = ""
headquarters_state: str = ""
has_international_locations: bool
international_countries: list[str]
target_markets: list[str]
primary_jurisdiction: str
is_data_controller: bool
is_data_processor: bool
uses_ai: bool
ai_use_cases: list[str]
dpo_name: Optional[str]
dpo_email: Optional[str]
legal_contact_name: Optional[str]
legal_contact_email: Optional[str]
machine_builder: Optional[dict]
is_complete: bool
completed_at: Optional[str]
created_at: str
updated_at: str
# Phase 2 fields
repos: list[dict] = []
document_sources: list[dict] = []
processing_systems: list[dict] = []
ai_systems: list[dict] = []
technical_contacts: list[dict] = []
subject_to_nis2: bool = False
subject_to_ai_act: bool = False
subject_to_iso27001: bool = False
supervisory_authority: Optional[str] = None
review_cycle_months: int = 12
class AuditEntryResponse(BaseModel):
id: str
action: str
changed_fields: Optional[dict]
changed_by: Optional[str]
created_at: str
class AuditListResponse(BaseModel):
entries: list[AuditEntryResponse]
total: int
# =============================================================================
# SQL column lists — keep in sync with SELECT/INSERT
# =============================================================================
_BASE_COLUMNS_LIST = [
"id", "tenant_id", "company_name", "legal_form", "industry", "founded_year",
"business_model", "offerings", "company_size", "employee_count", "annual_revenue",
"headquarters_country", "headquarters_city", "has_international_locations",
"international_countries", "target_markets", "primary_jurisdiction",
"is_data_controller", "is_data_processor", "uses_ai", "ai_use_cases",
"dpo_name", "dpo_email", "legal_contact_name", "legal_contact_email",
"machine_builder", "is_complete", "completed_at", "created_at", "updated_at",
"repos", "document_sources", "processing_systems", "ai_systems", "technical_contacts",
"subject_to_nis2", "subject_to_ai_act", "subject_to_iso27001",
"supervisory_authority", "review_cycle_months",
"project_id", "offering_urls",
"headquarters_country_other", "headquarters_street", "headquarters_zip", "headquarters_state",
]
_BASE_COLUMNS = ", ".join(_BASE_COLUMNS_LIST)
# Per-field defaults and type coercions for row_to_response.
_FIELD_DEFAULTS = {
"id": (None, "STR"),
"tenant_id": (None, None),
"company_name": ("", None),
"legal_form": ("GmbH", None),
"industry": ("", None),
"founded_year": (None, None),
"business_model": ("B2B", None),
"offerings": ([], list),
"offering_urls": ({}, dict),
"company_size": ("small", None),
"employee_count": ("1-9", None),
"annual_revenue": ("< 2 Mio", None),
"headquarters_country": ("DE", None),
"headquarters_country_other": ("", None),
"headquarters_street": ("", None),
"headquarters_zip": ("", None),
"headquarters_city": ("", None),
"headquarters_state": ("", None),
"has_international_locations": (False, None),
"international_countries": ([], list),
"target_markets": (["DE"], list),
"primary_jurisdiction": ("DE", None),
"is_data_controller": (True, None),
"is_data_processor": (False, None),
"uses_ai": (False, None),
"ai_use_cases": ([], list),
"dpo_name": (None, None),
"dpo_email": (None, None),
"legal_contact_name": (None, None),
"legal_contact_email": (None, None),
"machine_builder": (None, dict),
"is_complete": (False, None),
"completed_at": (None, "STR_OR_NONE"),
"created_at": (None, "STR"),
"updated_at": (None, "STR"),
"repos": ([], list),
"document_sources": ([], list),
"processing_systems": ([], list),
"ai_systems": ([], list),
"technical_contacts": ([], list),
"subject_to_nis2": (False, None),
"subject_to_ai_act": (False, None),
"subject_to_iso27001": (False, None),
"supervisory_authority": (None, None),
"review_cycle_months": (12, None),
"project_id": (None, "STR_OR_NONE"),
}
# =============================================================================
# HELPERS
# =============================================================================
def _where_clause():
"""WHERE clause matching tenant_id + project_id (handles NULL)."""
return "tenant_id = :tid AND project_id IS NOT DISTINCT FROM :pid"
def row_to_response(row) -> CompanyProfileResponse:
"""Convert a DB row to response model using zip-based column mapping."""
raw = dict(zip(_BASE_COLUMNS_LIST, row))
coerced: dict = {}
for col in _BASE_COLUMNS_LIST:
default, expected_type = _FIELD_DEFAULTS[col]
value = raw[col]
if expected_type == "STR":
coerced[col] = str(value)
elif expected_type == "STR_OR_NONE":
coerced[col] = str(value) if value else None
elif expected_type is not None:
coerced[col] = value if isinstance(value, expected_type) else default
else:
if col == "is_data_controller":
coerced[col] = value if value is not None else default
else:
coerced[col] = value or default if default is not None else value
return CompanyProfileResponse(**coerced)
def log_audit(db, tenant_id: str, action: str, changed_fields: Optional[dict], changed_by: Optional[str], project_id: Optional[str] = None):
"""Write an audit log entry."""
try:
db.execute(
text("""INSERT INTO compliance_company_profile_audit
(tenant_id, project_id, action, changed_fields, changed_by)
VALUES (:tenant_id, :project_id, :action, :fields::jsonb, :changed_by)"""),
{
"tenant_id": tenant_id,
"project_id": project_id,
"action": action,
"fields": json.dumps(changed_fields) if changed_fields else None,
"changed_by": changed_by,
},
)
except Exception as e:
logger.warning(f"Failed to write audit log: {e}")
def _resolve_ids(tenant_id: str, x_tenant_id: Optional[str], project_id: Optional[str]):
"""Resolve tenant_id and project_id from params/headers."""
tid = x_tenant_id or tenant_id
pid = project_id if project_id and project_id != "null" else None
return tid, pid
# =============================================================================
# ROUTES
# =============================================================================
@router.get("", response_model=CompanyProfileResponse)
async def get_company_profile(
tenant_id: str = "default",
project_id: Optional[str] = Query(None),
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
):
"""Get company profile for a tenant (optionally per project)."""
tid, pid = _resolve_ids(tenant_id, x_tenant_id, project_id)
db = SessionLocal()
try:
result = db.execute(
text(f"SELECT {_BASE_COLUMNS} FROM compliance_company_profiles WHERE {_where_clause()}"),
{"tid": tid, "pid": pid},
)
row = result.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Company profile not found")
return row_to_response(row)
finally:
db.close()
@router.post("", response_model=CompanyProfileResponse)
async def upsert_company_profile(
profile: CompanyProfileRequest,
tenant_id: str = "default",
project_id: Optional[str] = Query(None),
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
):
"""Create or update company profile (upsert)."""
tid, pid = _resolve_ids(tenant_id, x_tenant_id, project_id or profile.project_id)
db = SessionLocal()
try:
# Check if profile exists for this tenant+project
existing = db.execute(
text(f"SELECT id FROM compliance_company_profiles WHERE {_where_clause()}"),
{"tid": tid, "pid": pid},
).fetchone()
action = "update" if existing else "create"
completed_at_sql = "NOW()" if profile.is_complete else "NULL"
params = {
"tid": tid,
"pid": pid,
"company_name": profile.company_name,
"legal_form": profile.legal_form,
"industry": profile.industry,
"founded_year": profile.founded_year,
"business_model": profile.business_model,
"offerings": json.dumps(profile.offerings),
"offering_urls": json.dumps(profile.offering_urls),
"company_size": profile.company_size,
"employee_count": profile.employee_count,
"annual_revenue": profile.annual_revenue,
"hq_country": profile.headquarters_country,
"hq_country_other": profile.headquarters_country_other,
"hq_street": profile.headquarters_street,
"hq_zip": profile.headquarters_zip,
"hq_city": profile.headquarters_city,
"hq_state": profile.headquarters_state,
"has_intl": profile.has_international_locations,
"intl_countries": json.dumps(profile.international_countries),
"target_markets": json.dumps(profile.target_markets),
"jurisdiction": profile.primary_jurisdiction,
"is_controller": profile.is_data_controller,
"is_processor": profile.is_data_processor,
"uses_ai": profile.uses_ai,
"ai_use_cases": json.dumps(profile.ai_use_cases),
"dpo_name": profile.dpo_name,
"dpo_email": profile.dpo_email,
"legal_name": profile.legal_contact_name,
"legal_email": profile.legal_contact_email,
"machine_builder": json.dumps(profile.machine_builder) if profile.machine_builder else None,
"is_complete": profile.is_complete,
"repos": json.dumps(profile.repos),
"document_sources": json.dumps(profile.document_sources),
"processing_systems": json.dumps(profile.processing_systems),
"ai_systems": json.dumps(profile.ai_systems),
"technical_contacts": json.dumps(profile.technical_contacts),
"subject_to_nis2": profile.subject_to_nis2,
"subject_to_ai_act": profile.subject_to_ai_act,
"subject_to_iso27001": profile.subject_to_iso27001,
"supervisory_authority": profile.supervisory_authority,
"review_cycle_months": profile.review_cycle_months,
}
if existing:
db.execute(
text(f"""UPDATE compliance_company_profiles SET
company_name = :company_name, legal_form = :legal_form,
industry = :industry, founded_year = :founded_year,
business_model = :business_model, offerings = :offerings::jsonb,
offering_urls = :offering_urls::jsonb,
company_size = :company_size, employee_count = :employee_count,
annual_revenue = :annual_revenue,
headquarters_country = :hq_country, headquarters_country_other = :hq_country_other,
headquarters_street = :hq_street, headquarters_zip = :hq_zip,
headquarters_city = :hq_city, headquarters_state = :hq_state,
has_international_locations = :has_intl,
international_countries = :intl_countries::jsonb,
target_markets = :target_markets::jsonb, primary_jurisdiction = :jurisdiction,
is_data_controller = :is_controller, is_data_processor = :is_processor,
uses_ai = :uses_ai, ai_use_cases = :ai_use_cases::jsonb,
dpo_name = :dpo_name, dpo_email = :dpo_email,
legal_contact_name = :legal_name, legal_contact_email = :legal_email,
machine_builder = :machine_builder::jsonb, is_complete = :is_complete,
repos = :repos::jsonb, document_sources = :document_sources::jsonb,
processing_systems = :processing_systems::jsonb,
ai_systems = :ai_systems::jsonb, technical_contacts = :technical_contacts::jsonb,
subject_to_nis2 = :subject_to_nis2, subject_to_ai_act = :subject_to_ai_act,
subject_to_iso27001 = :subject_to_iso27001,
supervisory_authority = :supervisory_authority,
review_cycle_months = :review_cycle_months,
updated_at = NOW(), completed_at = {completed_at_sql}
WHERE {_where_clause()}"""),
params,
)
else:
db.execute(
text(f"""INSERT INTO compliance_company_profiles
(tenant_id, project_id, company_name, legal_form, industry, founded_year,
business_model, offerings, offering_urls,
company_size, employee_count, annual_revenue,
headquarters_country, headquarters_country_other,
headquarters_street, headquarters_zip, headquarters_city, headquarters_state,
has_international_locations, international_countries,
target_markets, primary_jurisdiction,
is_data_controller, is_data_processor, uses_ai, ai_use_cases,
dpo_name, dpo_email, legal_contact_name, legal_contact_email,
machine_builder, is_complete, completed_at,
repos, document_sources, processing_systems, ai_systems, technical_contacts,
subject_to_nis2, subject_to_ai_act, subject_to_iso27001,
supervisory_authority, review_cycle_months)
VALUES (:tid, :pid, :company_name, :legal_form, :industry, :founded_year,
:business_model, :offerings::jsonb, :offering_urls::jsonb,
:company_size, :employee_count, :annual_revenue,
:hq_country, :hq_country_other,
:hq_street, :hq_zip, :hq_city, :hq_state,
:has_intl, :intl_countries::jsonb,
:target_markets::jsonb, :jurisdiction,
:is_controller, :is_processor, :uses_ai, :ai_use_cases::jsonb,
:dpo_name, :dpo_email, :legal_name, :legal_email,
:machine_builder::jsonb, :is_complete, {completed_at_sql},
:repos::jsonb, :document_sources::jsonb, :processing_systems::jsonb,
:ai_systems::jsonb, :technical_contacts::jsonb,
:subject_to_nis2, :subject_to_ai_act, :subject_to_iso27001,
:supervisory_authority, :review_cycle_months)"""),
params,
)
log_audit(db, tid, action, profile.model_dump(), None, pid)
db.commit()
# Fetch and return
result = db.execute(
text(f"SELECT {_BASE_COLUMNS} FROM compliance_company_profiles WHERE {_where_clause()}"),
{"tid": tid, "pid": pid},
)
row = result.fetchone()
return row_to_response(row)
except HTTPException:
raise
except Exception as e:
db.rollback()
logger.error(f"Failed to upsert company profile: {e}")
raise HTTPException(status_code=500, detail="Failed to save company profile")
finally:
db.close()
@router.delete("", status_code=200)
async def delete_company_profile(
tenant_id: str = "default",
project_id: Optional[str] = Query(None),
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
):
"""Delete company profile for a tenant (DSGVO Recht auf Loeschung, Art. 17)."""
tid, pid = _resolve_ids(tenant_id, x_tenant_id, project_id)
db = SessionLocal()
try:
existing = db.execute(
text(f"SELECT id FROM compliance_company_profiles WHERE {_where_clause()}"),
{"tid": tid, "pid": pid},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail="Company profile not found")
db.execute(
text(f"DELETE FROM compliance_company_profiles WHERE {_where_clause()}"),
{"tid": tid, "pid": pid},
)
log_audit(db, tid, "delete", None, None, pid)
db.commit()
return {"success": True, "message": "Company profile deleted"}
except HTTPException:
raise
except Exception as e:
db.rollback()
logger.error(f"Failed to delete company profile: {e}")
raise HTTPException(status_code=500, detail="Failed to delete company profile")
finally:
db.close()
@router.get("/template-context")
async def get_template_context(
tenant_id: str = "default",
project_id: Optional[str] = Query(None),
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
):
"""Return flat dict for Jinja2 template substitution in document generation."""
tid, pid = _resolve_ids(tenant_id, x_tenant_id, project_id)
db = SessionLocal()
try:
result = db.execute(
text(f"SELECT {_BASE_COLUMNS} FROM compliance_company_profiles WHERE {_where_clause()}"),
{"tid": tid, "pid": pid},
)
row = result.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Company profile not found — fill Stammdaten first")
resp = row_to_response(row)
ctx = {
"company_name": resp.company_name,
"legal_form": resp.legal_form,
"industry": resp.industry,
"business_model": resp.business_model,
"company_size": resp.company_size,
"employee_count": resp.employee_count,
"headquarters_country": resp.headquarters_country,
"headquarters_city": resp.headquarters_city,
"primary_jurisdiction": resp.primary_jurisdiction,
"is_data_controller": resp.is_data_controller,
"is_data_processor": resp.is_data_processor,
"uses_ai": resp.uses_ai,
"dpo_name": resp.dpo_name or "",
"dpo_email": resp.dpo_email or "",
"legal_contact_name": resp.legal_contact_name or "",
"legal_contact_email": resp.legal_contact_email or "",
"supervisory_authority": resp.supervisory_authority or "",
"review_cycle_months": resp.review_cycle_months,
"subject_to_nis2": resp.subject_to_nis2,
"subject_to_ai_act": resp.subject_to_ai_act,
"subject_to_iso27001": resp.subject_to_iso27001,
"offerings": resp.offerings,
"target_markets": resp.target_markets,
"international_countries": resp.international_countries,
"ai_use_cases": resp.ai_use_cases,
"repos": resp.repos,
"document_sources": resp.document_sources,
"processing_systems": resp.processing_systems,
"ai_systems": resp.ai_systems,
"technical_contacts": resp.technical_contacts,
"has_ai_systems": len(resp.ai_systems) > 0,
"processing_system_count": len(resp.processing_systems),
"ai_system_count": len(resp.ai_systems),
"is_complete": resp.is_complete,
}
return ctx
finally:
db.close()
@router.get("/audit", response_model=AuditListResponse)
async def get_audit_log(
tenant_id: str = "default",
project_id: Optional[str] = Query(None),
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
):
"""Get audit log for company profile changes."""
tid, pid = _resolve_ids(tenant_id, x_tenant_id, project_id)
db = SessionLocal()
try:
result = db.execute(
text("""SELECT id, action, changed_fields, changed_by, created_at
FROM compliance_company_profile_audit
WHERE tenant_id = :tid AND project_id IS NOT DISTINCT FROM :pid
ORDER BY created_at DESC
LIMIT 100"""),
{"tid": tid, "pid": pid},
)
rows = result.fetchall()
entries = [
AuditEntryResponse(
id=str(r[0]),
action=r[1],
changed_fields=r[2] if isinstance(r[2], dict) else None,
changed_by=r[3],
created_at=str(r[4]),
)
for r in rows
]
return AuditListResponse(entries=entries, total=len(entries))
finally:
db.close()
@router.patch("", response_model=CompanyProfileResponse)
async def patch_company_profile(
updates: dict,
tenant_id: str = "default",
project_id: Optional[str] = Query(None),
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
):
"""Partial update for company profile."""
tid, pid = _resolve_ids(tenant_id, x_tenant_id, project_id or updates.get("project_id"))
db = SessionLocal()
try:
existing = db.execute(
text(f"SELECT id FROM compliance_company_profiles WHERE {_where_clause()}"),
{"tid": tid, "pid": pid},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail="Company profile not found")
# Build SET clause from provided fields
allowed_fields = set(_BASE_COLUMNS_LIST) - {"id", "tenant_id", "project_id", "created_at", "updated_at", "completed_at"}
set_parts = []
params = {"tid": tid, "pid": pid}
jsonb_fields = {"offerings", "offering_urls", "international_countries", "target_markets",
"ai_use_cases", "machine_builder", "repos", "document_sources",
"processing_systems", "ai_systems", "technical_contacts"}
for key, value in updates.items():
if key in allowed_fields:
param_name = f"p_{key}"
if key in jsonb_fields:
set_parts.append(f"{key} = :{param_name}::jsonb")
params[param_name] = json.dumps(value) if value is not None else None
else:
set_parts.append(f"{key} = :{param_name}")
params[param_name] = value
if not set_parts:
raise HTTPException(status_code=400, detail="No valid fields to update")
set_parts.append("updated_at = NOW()")
db.execute(
text(f"UPDATE compliance_company_profiles SET {', '.join(set_parts)} WHERE {_where_clause()}"),
params,
)
log_audit(db, tid, "patch", updates, None, pid)
db.commit()
result = db.execute(
text(f"SELECT {_BASE_COLUMNS} FROM compliance_company_profiles WHERE {_where_clause()}"),
{"tid": tid, "pid": pid},
)
row = result.fetchone()
return row_to_response(row)
except HTTPException:
raise
except Exception as e:
db.rollback()
logger.error(f"Failed to patch company profile: {e}")
raise HTTPException(status_code=500, detail="Failed to patch company profile")
finally:
db.close()