Files
breakpilot-compliance/backend-compliance/compliance/api/company_profile_routes.py
Sharang Parnerkar 2c8df1433d
Some checks failed
Deploy to Coolify / deploy (push) Has been cancelled
Fix SQLAlchemy 2.x compatibility: wrap raw SQL in text()
SQLAlchemy 2.x requires raw SQL strings to be explicitly wrapped
in text(). Fixed 16 instances across 5 route files.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 13:56:38 +01:00

538 lines
22 KiB
Python

"""
FastAPI routes for Company Profile CRUD with audit logging.
Endpoints:
- GET /v1/company-profile: Get company profile for a tenant
- POST /v1/company-profile: Create or update company profile
- DELETE /v1/company-profile: Delete company profile
- GET /v1/company-profile/audit: Get audit log for a tenant
- GET /v1/company-profile/template-context: Flat dict for template substitution
"""
import json
import logging
from typing import Optional
from fastapi import APIRouter, HTTPException, Header
from pydantic import BaseModel
from sqlalchemy import text
from database import SessionLocal
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/v1/company-profile", tags=["company-profile"])
# =============================================================================
# REQUEST/RESPONSE MODELS
# =============================================================================
class CompanyProfileRequest(BaseModel):
company_name: str = ""
legal_form: str = "GmbH"
industry: str = ""
founded_year: Optional[int] = None
business_model: str = "B2B"
offerings: list[str] = []
company_size: str = "small"
employee_count: str = "1-9"
annual_revenue: str = "< 2 Mio"
headquarters_country: str = "DE"
headquarters_city: str = ""
has_international_locations: bool = False
international_countries: list[str] = []
target_markets: list[str] = ["DE"]
primary_jurisdiction: str = "DE"
is_data_controller: bool = True
is_data_processor: bool = False
uses_ai: bool = False
ai_use_cases: list[str] = []
dpo_name: Optional[str] = None
dpo_email: Optional[str] = None
legal_contact_name: Optional[str] = None
legal_contact_email: Optional[str] = None
machine_builder: Optional[dict] = None
is_complete: bool = False
# Phase 2 fields
repos: list[dict] = []
document_sources: list[dict] = []
processing_systems: list[dict] = []
ai_systems: list[dict] = []
technical_contacts: list[dict] = []
subject_to_nis2: bool = False
subject_to_ai_act: bool = False
subject_to_iso27001: bool = False
supervisory_authority: Optional[str] = None
review_cycle_months: int = 12
class CompanyProfileResponse(BaseModel):
id: str
tenant_id: str
company_name: str
legal_form: str
industry: str
founded_year: Optional[int]
business_model: str
offerings: list[str]
company_size: str
employee_count: str
annual_revenue: str
headquarters_country: str
headquarters_city: str
has_international_locations: bool
international_countries: list[str]
target_markets: list[str]
primary_jurisdiction: str
is_data_controller: bool
is_data_processor: bool
uses_ai: bool
ai_use_cases: list[str]
dpo_name: Optional[str]
dpo_email: Optional[str]
legal_contact_name: Optional[str]
legal_contact_email: Optional[str]
machine_builder: Optional[dict]
is_complete: bool
completed_at: Optional[str]
created_at: str
updated_at: str
# Phase 2 fields
repos: list[dict] = []
document_sources: list[dict] = []
processing_systems: list[dict] = []
ai_systems: list[dict] = []
technical_contacts: list[dict] = []
subject_to_nis2: bool = False
subject_to_ai_act: bool = False
subject_to_iso27001: bool = False
supervisory_authority: Optional[str] = None
review_cycle_months: int = 12
class AuditEntryResponse(BaseModel):
id: str
action: str
changed_fields: Optional[dict]
changed_by: Optional[str]
created_at: str
class AuditListResponse(BaseModel):
entries: list[AuditEntryResponse]
total: int
# =============================================================================
# SQL column lists — keep in sync with SELECT/INSERT
# =============================================================================
_BASE_COLUMNS_LIST = [
"id", "tenant_id", "company_name", "legal_form", "industry", "founded_year",
"business_model", "offerings", "company_size", "employee_count", "annual_revenue",
"headquarters_country", "headquarters_city", "has_international_locations",
"international_countries", "target_markets", "primary_jurisdiction",
"is_data_controller", "is_data_processor", "uses_ai", "ai_use_cases",
"dpo_name", "dpo_email", "legal_contact_name", "legal_contact_email",
"machine_builder", "is_complete", "completed_at", "created_at", "updated_at",
"repos", "document_sources", "processing_systems", "ai_systems", "technical_contacts",
"subject_to_nis2", "subject_to_ai_act", "subject_to_iso27001",
"supervisory_authority", "review_cycle_months",
]
_BASE_COLUMNS = ", ".join(_BASE_COLUMNS_LIST)
# Per-field defaults and type coercions for row_to_response.
# Each entry is (field_name, default_value, expected_type_or_None).
# - expected_type: if set, the value is checked with isinstance; if it fails,
# default_value is used instead.
# - Special sentinels: "STR" means str(value), "STR_OR_NONE" means str(v) if v else None.
_FIELD_DEFAULTS = {
"id": (None, "STR"),
"tenant_id": (None, None),
"company_name": ("", None),
"legal_form": ("GmbH", None),
"industry": ("", None),
"founded_year": (None, None),
"business_model": ("B2B", None),
"offerings": ([], list),
"company_size": ("small", None),
"employee_count": ("1-9", None),
"annual_revenue": ("< 2 Mio", None),
"headquarters_country": ("DE", None),
"headquarters_city": ("", None),
"has_international_locations": (False, None),
"international_countries": ([], list),
"target_markets": (["DE"], list),
"primary_jurisdiction": ("DE", None),
"is_data_controller": (True, None),
"is_data_processor": (False, None),
"uses_ai": (False, None),
"ai_use_cases": ([], list),
"dpo_name": (None, None),
"dpo_email": (None, None),
"legal_contact_name": (None, None),
"legal_contact_email": (None, None),
"machine_builder": (None, dict),
"is_complete": (False, None),
"completed_at": (None, "STR_OR_NONE"),
"created_at": (None, "STR"),
"updated_at": (None, "STR"),
"repos": ([], list),
"document_sources": ([], list),
"processing_systems": ([], list),
"ai_systems": ([], list),
"technical_contacts": ([], list),
"subject_to_nis2": (False, None),
"subject_to_ai_act": (False, None),
"subject_to_iso27001": (False, None),
"supervisory_authority": (None, None),
"review_cycle_months": (12, None),
}
# =============================================================================
# HELPERS
# =============================================================================
def row_to_response(row) -> CompanyProfileResponse:
"""Convert a DB row to response model using zip-based column mapping."""
raw = dict(zip(_BASE_COLUMNS_LIST, row))
coerced: dict = {}
for col in _BASE_COLUMNS_LIST:
default, expected_type = _FIELD_DEFAULTS[col]
value = raw[col]
if expected_type == "STR":
coerced[col] = str(value)
elif expected_type == "STR_OR_NONE":
coerced[col] = str(value) if value else None
elif expected_type is not None:
# Type-checked field (list / dict): use value only if it matches
coerced[col] = value if isinstance(value, expected_type) else default
else:
# is_data_controller needs special None-check (True when NULL)
if col == "is_data_controller":
coerced[col] = value if value is not None else default
else:
coerced[col] = value or default if default is not None else value
return CompanyProfileResponse(**coerced)
def log_audit(db, tenant_id: str, action: str, changed_fields: Optional[dict], changed_by: Optional[str]):
"""Write an audit log entry."""
try:
db.execute(
text("""INSERT INTO compliance_company_profile_audit
(tenant_id, action, changed_fields, changed_by)
VALUES (:tenant_id, :action, :fields::jsonb, :changed_by)"""),
{
"tenant_id": tenant_id,
"action": action,
"fields": json.dumps(changed_fields) if changed_fields else None,
"changed_by": changed_by,
},
)
except Exception as e:
logger.warning(f"Failed to write audit log: {e}")
# =============================================================================
# ROUTES
# =============================================================================
@router.get("", response_model=CompanyProfileResponse)
async def get_company_profile(
tenant_id: str = "default",
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
):
"""Get company profile for a tenant."""
tid = x_tenant_id or tenant_id
db = SessionLocal()
try:
result = db.execute(
text(f"SELECT {_BASE_COLUMNS} FROM compliance_company_profiles WHERE tenant_id = :tenant_id"),
{"tenant_id": tid},
)
row = result.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Company profile not found")
return row_to_response(row)
finally:
db.close()
@router.post("", response_model=CompanyProfileResponse)
async def upsert_company_profile(
profile: CompanyProfileRequest,
tenant_id: str = "default",
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
):
"""Create or update company profile (upsert)."""
tid = x_tenant_id or tenant_id
db = SessionLocal()
try:
# Check if profile exists
existing = db.execute(
text("SELECT id FROM compliance_company_profiles WHERE tenant_id = :tid"),
{"tid": tid},
).fetchone()
action = "update" if existing else "create"
completed_at_clause = ", completed_at = NOW()" if profile.is_complete else ", completed_at = NULL"
db.execute(
text(f"""INSERT INTO compliance_company_profiles
(tenant_id, company_name, legal_form, industry, founded_year,
business_model, offerings, company_size, employee_count, annual_revenue,
headquarters_country, headquarters_city, has_international_locations,
international_countries, target_markets, primary_jurisdiction,
is_data_controller, is_data_processor, uses_ai, ai_use_cases,
dpo_name, dpo_email, legal_contact_name, legal_contact_email,
machine_builder, is_complete,
repos, document_sources, processing_systems, ai_systems, technical_contacts,
subject_to_nis2, subject_to_ai_act, subject_to_iso27001,
supervisory_authority, review_cycle_months)
VALUES (:tid, :company_name, :legal_form, :industry, :founded_year,
:business_model, :offerings::jsonb, :company_size, :employee_count, :annual_revenue,
:hq_country, :hq_city, :has_intl, :intl_countries::jsonb,
:target_markets::jsonb, :jurisdiction,
:is_controller, :is_processor, :uses_ai, :ai_use_cases::jsonb,
:dpo_name, :dpo_email, :legal_name, :legal_email,
:machine_builder::jsonb, :is_complete,
:repos::jsonb, :document_sources::jsonb, :processing_systems::jsonb,
:ai_systems::jsonb, :technical_contacts::jsonb,
:subject_to_nis2, :subject_to_ai_act, :subject_to_iso27001,
:supervisory_authority, :review_cycle_months)
ON CONFLICT (tenant_id) DO UPDATE SET
company_name = EXCLUDED.company_name,
legal_form = EXCLUDED.legal_form,
industry = EXCLUDED.industry,
founded_year = EXCLUDED.founded_year,
business_model = EXCLUDED.business_model,
offerings = EXCLUDED.offerings,
company_size = EXCLUDED.company_size,
employee_count = EXCLUDED.employee_count,
annual_revenue = EXCLUDED.annual_revenue,
headquarters_country = EXCLUDED.headquarters_country,
headquarters_city = EXCLUDED.headquarters_city,
has_international_locations = EXCLUDED.has_international_locations,
international_countries = EXCLUDED.international_countries,
target_markets = EXCLUDED.target_markets,
primary_jurisdiction = EXCLUDED.primary_jurisdiction,
is_data_controller = EXCLUDED.is_data_controller,
is_data_processor = EXCLUDED.is_data_processor,
uses_ai = EXCLUDED.uses_ai,
ai_use_cases = EXCLUDED.ai_use_cases,
dpo_name = EXCLUDED.dpo_name,
dpo_email = EXCLUDED.dpo_email,
legal_contact_name = EXCLUDED.legal_contact_name,
legal_contact_email = EXCLUDED.legal_contact_email,
machine_builder = EXCLUDED.machine_builder,
is_complete = EXCLUDED.is_complete,
repos = EXCLUDED.repos,
document_sources = EXCLUDED.document_sources,
processing_systems = EXCLUDED.processing_systems,
ai_systems = EXCLUDED.ai_systems,
technical_contacts = EXCLUDED.technical_contacts,
subject_to_nis2 = EXCLUDED.subject_to_nis2,
subject_to_ai_act = EXCLUDED.subject_to_ai_act,
subject_to_iso27001 = EXCLUDED.subject_to_iso27001,
supervisory_authority = EXCLUDED.supervisory_authority,
review_cycle_months = EXCLUDED.review_cycle_months,
updated_at = NOW()
{completed_at_clause}"""),
{
"tid": tid,
"company_name": profile.company_name,
"legal_form": profile.legal_form,
"industry": profile.industry,
"founded_year": profile.founded_year,
"business_model": profile.business_model,
"offerings": json.dumps(profile.offerings),
"company_size": profile.company_size,
"employee_count": profile.employee_count,
"annual_revenue": profile.annual_revenue,
"hq_country": profile.headquarters_country,
"hq_city": profile.headquarters_city,
"has_intl": profile.has_international_locations,
"intl_countries": json.dumps(profile.international_countries),
"target_markets": json.dumps(profile.target_markets),
"jurisdiction": profile.primary_jurisdiction,
"is_controller": profile.is_data_controller,
"is_processor": profile.is_data_processor,
"uses_ai": profile.uses_ai,
"ai_use_cases": json.dumps(profile.ai_use_cases),
"dpo_name": profile.dpo_name,
"dpo_email": profile.dpo_email,
"legal_name": profile.legal_contact_name,
"legal_email": profile.legal_contact_email,
"machine_builder": json.dumps(profile.machine_builder) if profile.machine_builder else None,
"is_complete": profile.is_complete,
"repos": json.dumps(profile.repos),
"document_sources": json.dumps(profile.document_sources),
"processing_systems": json.dumps(profile.processing_systems),
"ai_systems": json.dumps(profile.ai_systems),
"technical_contacts": json.dumps(profile.technical_contacts),
"subject_to_nis2": profile.subject_to_nis2,
"subject_to_ai_act": profile.subject_to_ai_act,
"subject_to_iso27001": profile.subject_to_iso27001,
"supervisory_authority": profile.supervisory_authority,
"review_cycle_months": profile.review_cycle_months,
},
)
# Audit log
log_audit(db, tid, action, profile.model_dump(), None)
db.commit()
# Fetch and return
result = db.execute(
text(f"SELECT {_BASE_COLUMNS} FROM compliance_company_profiles WHERE tenant_id = :tid"),
{"tid": tid},
)
row = result.fetchone()
return row_to_response(row)
except Exception as e:
db.rollback()
logger.error(f"Failed to upsert company profile: {e}")
raise HTTPException(status_code=500, detail="Failed to save company profile")
finally:
db.close()
@router.delete("", status_code=200)
async def delete_company_profile(
tenant_id: str = "default",
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
):
"""Delete company profile for a tenant (DSGVO Recht auf Loeschung, Art. 17)."""
tid = x_tenant_id or tenant_id
db = SessionLocal()
try:
existing = db.execute(
text("SELECT id FROM compliance_company_profiles WHERE tenant_id = :tid"),
{"tid": tid},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail="Company profile not found")
db.execute(
text("DELETE FROM compliance_company_profiles WHERE tenant_id = :tid"),
{"tid": tid},
)
log_audit(db, tid, "delete", None, None)
db.commit()
return {"success": True, "message": "Company profile deleted"}
except HTTPException:
raise
except Exception as e:
db.rollback()
logger.error(f"Failed to delete company profile: {e}")
raise HTTPException(status_code=500, detail="Failed to delete company profile")
finally:
db.close()
@router.get("/template-context")
async def get_template_context(
tenant_id: str = "default",
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
):
"""Return flat dict for Jinja2 template substitution in document generation."""
tid = x_tenant_id or tenant_id
db = SessionLocal()
try:
result = db.execute(
text(f"SELECT {_BASE_COLUMNS} FROM compliance_company_profiles WHERE tenant_id = :tid"),
{"tid": tid},
)
row = result.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Company profile not found — fill Stammdaten first")
resp = row_to_response(row)
# Build flat context dict for templates
ctx = {
"company_name": resp.company_name,
"legal_form": resp.legal_form,
"industry": resp.industry,
"business_model": resp.business_model,
"company_size": resp.company_size,
"employee_count": resp.employee_count,
"headquarters_country": resp.headquarters_country,
"headquarters_city": resp.headquarters_city,
"primary_jurisdiction": resp.primary_jurisdiction,
"is_data_controller": resp.is_data_controller,
"is_data_processor": resp.is_data_processor,
"uses_ai": resp.uses_ai,
"dpo_name": resp.dpo_name or "",
"dpo_email": resp.dpo_email or "",
"legal_contact_name": resp.legal_contact_name or "",
"legal_contact_email": resp.legal_contact_email or "",
"supervisory_authority": resp.supervisory_authority or "",
"review_cycle_months": resp.review_cycle_months,
"subject_to_nis2": resp.subject_to_nis2,
"subject_to_ai_act": resp.subject_to_ai_act,
"subject_to_iso27001": resp.subject_to_iso27001,
# Lists as-is for iteration in templates
"offerings": resp.offerings,
"target_markets": resp.target_markets,
"international_countries": resp.international_countries,
"ai_use_cases": resp.ai_use_cases,
"repos": resp.repos,
"document_sources": resp.document_sources,
"processing_systems": resp.processing_systems,
"ai_systems": resp.ai_systems,
"technical_contacts": resp.technical_contacts,
# Derived helper values
"has_ai_systems": len(resp.ai_systems) > 0,
"processing_system_count": len(resp.processing_systems),
"ai_system_count": len(resp.ai_systems),
"is_complete": resp.is_complete,
}
return ctx
finally:
db.close()
@router.get("/audit", response_model=AuditListResponse)
async def get_audit_log(
tenant_id: str = "default",
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
):
"""Get audit log for company profile changes."""
tid = x_tenant_id or tenant_id
db = SessionLocal()
try:
result = db.execute(
text("""SELECT id, action, changed_fields, changed_by, created_at
FROM compliance_company_profile_audit
WHERE tenant_id = :tid
ORDER BY created_at DESC
LIMIT 100"""),
{"tid": tid},
)
rows = result.fetchall()
entries = [
AuditEntryResponse(
id=str(r[0]),
action=r[1],
changed_fields=r[2] if isinstance(r[2], dict) else None,
changed_by=r[3],
created_at=str(r[4]),
)
for r in rows
]
return AuditListResponse(entries=entries, total=len(entries))
finally:
db.close()