""" FastAPI routes for Company Profile CRUD with audit logging. Endpoints: - GET /v1/company-profile: Get company profile for a tenant - POST /v1/company-profile: Create or update company profile - DELETE /v1/company-profile: Delete company profile - GET /v1/company-profile/audit: Get audit log for a tenant - GET /v1/company-profile/template-context: Flat dict for template substitution """ import json import logging from typing import Optional from fastapi import APIRouter, HTTPException, Header from pydantic import BaseModel from sqlalchemy import text from database import SessionLocal logger = logging.getLogger(__name__) router = APIRouter(prefix="/v1/company-profile", tags=["company-profile"]) # ============================================================================= # REQUEST/RESPONSE MODELS # ============================================================================= class CompanyProfileRequest(BaseModel): company_name: str = "" legal_form: str = "GmbH" industry: str = "" founded_year: Optional[int] = None business_model: str = "B2B" offerings: list[str] = [] company_size: str = "small" employee_count: str = "1-9" annual_revenue: str = "< 2 Mio" headquarters_country: str = "DE" headquarters_city: str = "" has_international_locations: bool = False international_countries: list[str] = [] target_markets: list[str] = ["DE"] primary_jurisdiction: str = "DE" is_data_controller: bool = True is_data_processor: bool = False uses_ai: bool = False ai_use_cases: list[str] = [] dpo_name: Optional[str] = None dpo_email: Optional[str] = None legal_contact_name: Optional[str] = None legal_contact_email: Optional[str] = None machine_builder: Optional[dict] = None is_complete: bool = False # Phase 2 fields repos: list[dict] = [] document_sources: list[dict] = [] processing_systems: list[dict] = [] ai_systems: list[dict] = [] technical_contacts: list[dict] = [] subject_to_nis2: bool = False subject_to_ai_act: bool = False subject_to_iso27001: bool = False supervisory_authority: Optional[str] = None review_cycle_months: int = 12 class CompanyProfileResponse(BaseModel): id: str tenant_id: str company_name: str legal_form: str industry: str founded_year: Optional[int] business_model: str offerings: list[str] company_size: str employee_count: str annual_revenue: str headquarters_country: str headquarters_city: str has_international_locations: bool international_countries: list[str] target_markets: list[str] primary_jurisdiction: str is_data_controller: bool is_data_processor: bool uses_ai: bool ai_use_cases: list[str] dpo_name: Optional[str] dpo_email: Optional[str] legal_contact_name: Optional[str] legal_contact_email: Optional[str] machine_builder: Optional[dict] is_complete: bool completed_at: Optional[str] created_at: str updated_at: str # Phase 2 fields repos: list[dict] = [] document_sources: list[dict] = [] processing_systems: list[dict] = [] ai_systems: list[dict] = [] technical_contacts: list[dict] = [] subject_to_nis2: bool = False subject_to_ai_act: bool = False subject_to_iso27001: bool = False supervisory_authority: Optional[str] = None review_cycle_months: int = 12 class AuditEntryResponse(BaseModel): id: str action: str changed_fields: Optional[dict] changed_by: Optional[str] created_at: str class AuditListResponse(BaseModel): entries: list[AuditEntryResponse] total: int # ============================================================================= # SQL column lists — keep in sync with SELECT/INSERT # ============================================================================= _BASE_COLUMNS_LIST = [ "id", "tenant_id", "company_name", "legal_form", "industry", "founded_year", "business_model", "offerings", "company_size", "employee_count", "annual_revenue", "headquarters_country", "headquarters_city", "has_international_locations", "international_countries", "target_markets", "primary_jurisdiction", "is_data_controller", "is_data_processor", "uses_ai", "ai_use_cases", "dpo_name", "dpo_email", "legal_contact_name", "legal_contact_email", "machine_builder", "is_complete", "completed_at", "created_at", "updated_at", "repos", "document_sources", "processing_systems", "ai_systems", "technical_contacts", "subject_to_nis2", "subject_to_ai_act", "subject_to_iso27001", "supervisory_authority", "review_cycle_months", ] _BASE_COLUMNS = ", ".join(_BASE_COLUMNS_LIST) # Per-field defaults and type coercions for row_to_response. # Each entry is (field_name, default_value, expected_type_or_None). # - expected_type: if set, the value is checked with isinstance; if it fails, # default_value is used instead. # - Special sentinels: "STR" means str(value), "STR_OR_NONE" means str(v) if v else None. _FIELD_DEFAULTS = { "id": (None, "STR"), "tenant_id": (None, None), "company_name": ("", None), "legal_form": ("GmbH", None), "industry": ("", None), "founded_year": (None, None), "business_model": ("B2B", None), "offerings": ([], list), "company_size": ("small", None), "employee_count": ("1-9", None), "annual_revenue": ("< 2 Mio", None), "headquarters_country": ("DE", None), "headquarters_city": ("", None), "has_international_locations": (False, None), "international_countries": ([], list), "target_markets": (["DE"], list), "primary_jurisdiction": ("DE", None), "is_data_controller": (True, None), "is_data_processor": (False, None), "uses_ai": (False, None), "ai_use_cases": ([], list), "dpo_name": (None, None), "dpo_email": (None, None), "legal_contact_name": (None, None), "legal_contact_email": (None, None), "machine_builder": (None, dict), "is_complete": (False, None), "completed_at": (None, "STR_OR_NONE"), "created_at": (None, "STR"), "updated_at": (None, "STR"), "repos": ([], list), "document_sources": ([], list), "processing_systems": ([], list), "ai_systems": ([], list), "technical_contacts": ([], list), "subject_to_nis2": (False, None), "subject_to_ai_act": (False, None), "subject_to_iso27001": (False, None), "supervisory_authority": (None, None), "review_cycle_months": (12, None), } # ============================================================================= # HELPERS # ============================================================================= def row_to_response(row) -> CompanyProfileResponse: """Convert a DB row to response model using zip-based column mapping.""" raw = dict(zip(_BASE_COLUMNS_LIST, row)) coerced: dict = {} for col in _BASE_COLUMNS_LIST: default, expected_type = _FIELD_DEFAULTS[col] value = raw[col] if expected_type == "STR": coerced[col] = str(value) elif expected_type == "STR_OR_NONE": coerced[col] = str(value) if value else None elif expected_type is not None: # Type-checked field (list / dict): use value only if it matches coerced[col] = value if isinstance(value, expected_type) else default else: # is_data_controller needs special None-check (True when NULL) if col == "is_data_controller": coerced[col] = value if value is not None else default else: coerced[col] = value or default if default is not None else value return CompanyProfileResponse(**coerced) def log_audit(db, tenant_id: str, action: str, changed_fields: Optional[dict], changed_by: Optional[str]): """Write an audit log entry.""" try: db.execute( text("""INSERT INTO compliance_company_profile_audit (tenant_id, action, changed_fields, changed_by) VALUES (:tenant_id, :action, :fields::jsonb, :changed_by)"""), { "tenant_id": tenant_id, "action": action, "fields": json.dumps(changed_fields) if changed_fields else None, "changed_by": changed_by, }, ) except Exception as e: logger.warning(f"Failed to write audit log: {e}") # ============================================================================= # ROUTES # ============================================================================= @router.get("", response_model=CompanyProfileResponse) async def get_company_profile( tenant_id: str = "default", x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"), ): """Get company profile for a tenant.""" tid = x_tenant_id or tenant_id db = SessionLocal() try: result = db.execute( text(f"SELECT {_BASE_COLUMNS} FROM compliance_company_profiles WHERE tenant_id = :tenant_id"), {"tenant_id": tid}, ) row = result.fetchone() if not row: raise HTTPException(status_code=404, detail="Company profile not found") return row_to_response(row) finally: db.close() @router.post("", response_model=CompanyProfileResponse) async def upsert_company_profile( profile: CompanyProfileRequest, tenant_id: str = "default", x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"), ): """Create or update company profile (upsert).""" tid = x_tenant_id or tenant_id db = SessionLocal() try: # Check if profile exists existing = db.execute( text("SELECT id FROM compliance_company_profiles WHERE tenant_id = :tid"), {"tid": tid}, ).fetchone() action = "update" if existing else "create" completed_at_clause = ", completed_at = NOW()" if profile.is_complete else ", completed_at = NULL" db.execute( text(f"""INSERT INTO compliance_company_profiles (tenant_id, company_name, legal_form, industry, founded_year, business_model, offerings, company_size, employee_count, annual_revenue, headquarters_country, headquarters_city, has_international_locations, international_countries, target_markets, primary_jurisdiction, is_data_controller, is_data_processor, uses_ai, ai_use_cases, dpo_name, dpo_email, legal_contact_name, legal_contact_email, machine_builder, is_complete, repos, document_sources, processing_systems, ai_systems, technical_contacts, subject_to_nis2, subject_to_ai_act, subject_to_iso27001, supervisory_authority, review_cycle_months) VALUES (:tid, :company_name, :legal_form, :industry, :founded_year, :business_model, :offerings::jsonb, :company_size, :employee_count, :annual_revenue, :hq_country, :hq_city, :has_intl, :intl_countries::jsonb, :target_markets::jsonb, :jurisdiction, :is_controller, :is_processor, :uses_ai, :ai_use_cases::jsonb, :dpo_name, :dpo_email, :legal_name, :legal_email, :machine_builder::jsonb, :is_complete, :repos::jsonb, :document_sources::jsonb, :processing_systems::jsonb, :ai_systems::jsonb, :technical_contacts::jsonb, :subject_to_nis2, :subject_to_ai_act, :subject_to_iso27001, :supervisory_authority, :review_cycle_months) ON CONFLICT (tenant_id) DO UPDATE SET company_name = EXCLUDED.company_name, legal_form = EXCLUDED.legal_form, industry = EXCLUDED.industry, founded_year = EXCLUDED.founded_year, business_model = EXCLUDED.business_model, offerings = EXCLUDED.offerings, company_size = EXCLUDED.company_size, employee_count = EXCLUDED.employee_count, annual_revenue = EXCLUDED.annual_revenue, headquarters_country = EXCLUDED.headquarters_country, headquarters_city = EXCLUDED.headquarters_city, has_international_locations = EXCLUDED.has_international_locations, international_countries = EXCLUDED.international_countries, target_markets = EXCLUDED.target_markets, primary_jurisdiction = EXCLUDED.primary_jurisdiction, is_data_controller = EXCLUDED.is_data_controller, is_data_processor = EXCLUDED.is_data_processor, uses_ai = EXCLUDED.uses_ai, ai_use_cases = EXCLUDED.ai_use_cases, dpo_name = EXCLUDED.dpo_name, dpo_email = EXCLUDED.dpo_email, legal_contact_name = EXCLUDED.legal_contact_name, legal_contact_email = EXCLUDED.legal_contact_email, machine_builder = EXCLUDED.machine_builder, is_complete = EXCLUDED.is_complete, repos = EXCLUDED.repos, document_sources = EXCLUDED.document_sources, processing_systems = EXCLUDED.processing_systems, ai_systems = EXCLUDED.ai_systems, technical_contacts = EXCLUDED.technical_contacts, subject_to_nis2 = EXCLUDED.subject_to_nis2, subject_to_ai_act = EXCLUDED.subject_to_ai_act, subject_to_iso27001 = EXCLUDED.subject_to_iso27001, supervisory_authority = EXCLUDED.supervisory_authority, review_cycle_months = EXCLUDED.review_cycle_months, updated_at = NOW() {completed_at_clause}"""), { "tid": tid, "company_name": profile.company_name, "legal_form": profile.legal_form, "industry": profile.industry, "founded_year": profile.founded_year, "business_model": profile.business_model, "offerings": json.dumps(profile.offerings), "company_size": profile.company_size, "employee_count": profile.employee_count, "annual_revenue": profile.annual_revenue, "hq_country": profile.headquarters_country, "hq_city": profile.headquarters_city, "has_intl": profile.has_international_locations, "intl_countries": json.dumps(profile.international_countries), "target_markets": json.dumps(profile.target_markets), "jurisdiction": profile.primary_jurisdiction, "is_controller": profile.is_data_controller, "is_processor": profile.is_data_processor, "uses_ai": profile.uses_ai, "ai_use_cases": json.dumps(profile.ai_use_cases), "dpo_name": profile.dpo_name, "dpo_email": profile.dpo_email, "legal_name": profile.legal_contact_name, "legal_email": profile.legal_contact_email, "machine_builder": json.dumps(profile.machine_builder) if profile.machine_builder else None, "is_complete": profile.is_complete, "repos": json.dumps(profile.repos), "document_sources": json.dumps(profile.document_sources), "processing_systems": json.dumps(profile.processing_systems), "ai_systems": json.dumps(profile.ai_systems), "technical_contacts": json.dumps(profile.technical_contacts), "subject_to_nis2": profile.subject_to_nis2, "subject_to_ai_act": profile.subject_to_ai_act, "subject_to_iso27001": profile.subject_to_iso27001, "supervisory_authority": profile.supervisory_authority, "review_cycle_months": profile.review_cycle_months, }, ) # Audit log log_audit(db, tid, action, profile.model_dump(), None) db.commit() # Fetch and return result = db.execute( text(f"SELECT {_BASE_COLUMNS} FROM compliance_company_profiles WHERE tenant_id = :tid"), {"tid": tid}, ) row = result.fetchone() return row_to_response(row) except Exception as e: db.rollback() logger.error(f"Failed to upsert company profile: {e}") raise HTTPException(status_code=500, detail="Failed to save company profile") finally: db.close() @router.delete("", status_code=200) async def delete_company_profile( tenant_id: str = "default", x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"), ): """Delete company profile for a tenant (DSGVO Recht auf Loeschung, Art. 17).""" tid = x_tenant_id or tenant_id db = SessionLocal() try: existing = db.execute( text("SELECT id FROM compliance_company_profiles WHERE tenant_id = :tid"), {"tid": tid}, ).fetchone() if not existing: raise HTTPException(status_code=404, detail="Company profile not found") db.execute( text("DELETE FROM compliance_company_profiles WHERE tenant_id = :tid"), {"tid": tid}, ) log_audit(db, tid, "delete", None, None) db.commit() return {"success": True, "message": "Company profile deleted"} except HTTPException: raise except Exception as e: db.rollback() logger.error(f"Failed to delete company profile: {e}") raise HTTPException(status_code=500, detail="Failed to delete company profile") finally: db.close() @router.get("/template-context") async def get_template_context( tenant_id: str = "default", x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"), ): """Return flat dict for Jinja2 template substitution in document generation.""" tid = x_tenant_id or tenant_id db = SessionLocal() try: result = db.execute( text(f"SELECT {_BASE_COLUMNS} FROM compliance_company_profiles WHERE tenant_id = :tid"), {"tid": tid}, ) row = result.fetchone() if not row: raise HTTPException(status_code=404, detail="Company profile not found — fill Stammdaten first") resp = row_to_response(row) # Build flat context dict for templates ctx = { "company_name": resp.company_name, "legal_form": resp.legal_form, "industry": resp.industry, "business_model": resp.business_model, "company_size": resp.company_size, "employee_count": resp.employee_count, "headquarters_country": resp.headquarters_country, "headquarters_city": resp.headquarters_city, "primary_jurisdiction": resp.primary_jurisdiction, "is_data_controller": resp.is_data_controller, "is_data_processor": resp.is_data_processor, "uses_ai": resp.uses_ai, "dpo_name": resp.dpo_name or "", "dpo_email": resp.dpo_email or "", "legal_contact_name": resp.legal_contact_name or "", "legal_contact_email": resp.legal_contact_email or "", "supervisory_authority": resp.supervisory_authority or "", "review_cycle_months": resp.review_cycle_months, "subject_to_nis2": resp.subject_to_nis2, "subject_to_ai_act": resp.subject_to_ai_act, "subject_to_iso27001": resp.subject_to_iso27001, # Lists as-is for iteration in templates "offerings": resp.offerings, "target_markets": resp.target_markets, "international_countries": resp.international_countries, "ai_use_cases": resp.ai_use_cases, "repos": resp.repos, "document_sources": resp.document_sources, "processing_systems": resp.processing_systems, "ai_systems": resp.ai_systems, "technical_contacts": resp.technical_contacts, # Derived helper values "has_ai_systems": len(resp.ai_systems) > 0, "processing_system_count": len(resp.processing_systems), "ai_system_count": len(resp.ai_systems), "is_complete": resp.is_complete, } return ctx finally: db.close() @router.get("/audit", response_model=AuditListResponse) async def get_audit_log( tenant_id: str = "default", x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"), ): """Get audit log for company profile changes.""" tid = x_tenant_id or tenant_id db = SessionLocal() try: result = db.execute( text("""SELECT id, action, changed_fields, changed_by, created_at FROM compliance_company_profile_audit WHERE tenant_id = :tid ORDER BY created_at DESC LIMIT 100"""), {"tid": tid}, ) rows = result.fetchall() entries = [ AuditEntryResponse( id=str(r[0]), action=r[1], changed_fields=r[2] if isinstance(r[2], dict) else None, changed_by=r[3], created_at=str(r[4]), ) for r in rows ] return AuditListResponse(entries=entries, total=len(entries)) finally: db.close()