""" FastAPI routes for Company Profile CRUD with audit logging. Endpoints: - GET /v1/company-profile: Get company profile for a tenant - POST /v1/company-profile: Create or update company profile - DELETE /v1/company-profile: Delete company profile - GET /v1/company-profile/audit: Get audit log for a tenant - GET /v1/company-profile/template-context: Flat dict for template substitution """ import json import logging from typing import Optional from fastapi import APIRouter, HTTPException, Header from pydantic import BaseModel from database import SessionLocal logger = logging.getLogger(__name__) router = APIRouter(prefix="/v1/company-profile", tags=["company-profile"]) # ============================================================================= # REQUEST/RESPONSE MODELS # ============================================================================= class CompanyProfileRequest(BaseModel): company_name: str = "" legal_form: str = "GmbH" industry: str = "" founded_year: Optional[int] = None business_model: str = "B2B" offerings: list[str] = [] company_size: str = "small" employee_count: str = "1-9" annual_revenue: str = "< 2 Mio" headquarters_country: str = "DE" headquarters_city: str = "" has_international_locations: bool = False international_countries: list[str] = [] target_markets: list[str] = ["DE"] primary_jurisdiction: str = "DE" is_data_controller: bool = True is_data_processor: bool = False uses_ai: bool = False ai_use_cases: list[str] = [] dpo_name: Optional[str] = None dpo_email: Optional[str] = None legal_contact_name: Optional[str] = None legal_contact_email: Optional[str] = None machine_builder: Optional[dict] = None is_complete: bool = False # Phase 2 fields repos: list[dict] = [] document_sources: list[dict] = [] processing_systems: list[dict] = [] ai_systems: list[dict] = [] technical_contacts: list[dict] = [] subject_to_nis2: bool = False subject_to_ai_act: bool = False subject_to_iso27001: bool = False supervisory_authority: Optional[str] = None review_cycle_months: int = 12 class CompanyProfileResponse(BaseModel): id: str tenant_id: str company_name: str legal_form: str industry: str founded_year: Optional[int] business_model: str offerings: list[str] company_size: str employee_count: str annual_revenue: str headquarters_country: str headquarters_city: str has_international_locations: bool international_countries: list[str] target_markets: list[str] primary_jurisdiction: str is_data_controller: bool is_data_processor: bool uses_ai: bool ai_use_cases: list[str] dpo_name: Optional[str] dpo_email: Optional[str] legal_contact_name: Optional[str] legal_contact_email: Optional[str] machine_builder: Optional[dict] is_complete: bool completed_at: Optional[str] created_at: str updated_at: str # Phase 2 fields repos: list[dict] = [] document_sources: list[dict] = [] processing_systems: list[dict] = [] ai_systems: list[dict] = [] technical_contacts: list[dict] = [] subject_to_nis2: bool = False subject_to_ai_act: bool = False subject_to_iso27001: bool = False supervisory_authority: Optional[str] = None review_cycle_months: int = 12 class AuditEntryResponse(BaseModel): id: str action: str changed_fields: Optional[dict] changed_by: Optional[str] created_at: str class AuditListResponse(BaseModel): entries: list[AuditEntryResponse] total: int # ============================================================================= # SQL column lists — keep in sync with SELECT/INSERT # ============================================================================= _BASE_COLUMNS_LIST = [ "id", "tenant_id", "company_name", "legal_form", "industry", "founded_year", "business_model", "offerings", "company_size", "employee_count", "annual_revenue", "headquarters_country", "headquarters_city", "has_international_locations", "international_countries", "target_markets", "primary_jurisdiction", "is_data_controller", "is_data_processor", "uses_ai", "ai_use_cases", "dpo_name", "dpo_email", "legal_contact_name", "legal_contact_email", "machine_builder", "is_complete", "completed_at", "created_at", "updated_at", "repos", "document_sources", "processing_systems", "ai_systems", "technical_contacts", "subject_to_nis2", "subject_to_ai_act", "subject_to_iso27001", "supervisory_authority", "review_cycle_months", ] _BASE_COLUMNS = ", ".join(_BASE_COLUMNS_LIST) # Per-field defaults and type coercions for row_to_response. # Each entry is (field_name, default_value, expected_type_or_None). # - expected_type: if set, the value is checked with isinstance; if it fails, # default_value is used instead. # - Special sentinels: "STR" means str(value), "STR_OR_NONE" means str(v) if v else None. _FIELD_DEFAULTS = { "id": (None, "STR"), "tenant_id": (None, None), "company_name": ("", None), "legal_form": ("GmbH", None), "industry": ("", None), "founded_year": (None, None), "business_model": ("B2B", None), "offerings": ([], list), "company_size": ("small", None), "employee_count": ("1-9", None), "annual_revenue": ("< 2 Mio", None), "headquarters_country": ("DE", None), "headquarters_city": ("", None), "has_international_locations": (False, None), "international_countries": ([], list), "target_markets": (["DE"], list), "primary_jurisdiction": ("DE", None), "is_data_controller": (True, None), "is_data_processor": (False, None), "uses_ai": (False, None), "ai_use_cases": ([], list), "dpo_name": (None, None), "dpo_email": (None, None), "legal_contact_name": (None, None), "legal_contact_email": (None, None), "machine_builder": (None, dict), "is_complete": (False, None), "completed_at": (None, "STR_OR_NONE"), "created_at": (None, "STR"), "updated_at": (None, "STR"), "repos": ([], list), "document_sources": ([], list), "processing_systems": ([], list), "ai_systems": ([], list), "technical_contacts": ([], list), "subject_to_nis2": (False, None), "subject_to_ai_act": (False, None), "subject_to_iso27001": (False, None), "supervisory_authority": (None, None), "review_cycle_months": (12, None), } # ============================================================================= # HELPERS # ============================================================================= def row_to_response(row) -> CompanyProfileResponse: """Convert a DB row to response model using zip-based column mapping.""" raw = dict(zip(_BASE_COLUMNS_LIST, row)) coerced: dict = {} for col in _BASE_COLUMNS_LIST: default, expected_type = _FIELD_DEFAULTS[col] value = raw[col] if expected_type == "STR": coerced[col] = str(value) elif expected_type == "STR_OR_NONE": coerced[col] = str(value) if value else None elif expected_type is not None: # Type-checked field (list / dict): use value only if it matches coerced[col] = value if isinstance(value, expected_type) else default else: # is_data_controller needs special None-check (True when NULL) if col == "is_data_controller": coerced[col] = value if value is not None else default else: coerced[col] = value or default if default is not None else value return CompanyProfileResponse(**coerced) def log_audit(db, tenant_id: str, action: str, changed_fields: Optional[dict], changed_by: Optional[str]): """Write an audit log entry.""" try: db.execute( """INSERT INTO compliance_company_profile_audit (tenant_id, action, changed_fields, changed_by) VALUES (:tenant_id, :action, :fields::jsonb, :changed_by)""", { "tenant_id": tenant_id, "action": action, "fields": json.dumps(changed_fields) if changed_fields else None, "changed_by": changed_by, }, ) except Exception as e: logger.warning(f"Failed to write audit log: {e}") # ============================================================================= # ROUTES # ============================================================================= @router.get("", response_model=CompanyProfileResponse) async def get_company_profile( tenant_id: str = "default", x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"), ): """Get company profile for a tenant.""" tid = x_tenant_id or tenant_id db = SessionLocal() try: result = db.execute( f"SELECT {_BASE_COLUMNS} FROM compliance_company_profiles WHERE tenant_id = :tenant_id", {"tenant_id": tid}, ) row = result.fetchone() if not row: raise HTTPException(status_code=404, detail="Company profile not found") return row_to_response(row) finally: db.close() @router.post("", response_model=CompanyProfileResponse) async def upsert_company_profile( profile: CompanyProfileRequest, tenant_id: str = "default", x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"), ): """Create or update company profile (upsert).""" tid = x_tenant_id or tenant_id db = SessionLocal() try: # Check if profile exists existing = db.execute( "SELECT id FROM compliance_company_profiles WHERE tenant_id = :tid", {"tid": tid}, ).fetchone() action = "update" if existing else "create" completed_at_clause = ", completed_at = NOW()" if profile.is_complete else ", completed_at = NULL" db.execute( f"""INSERT INTO compliance_company_profiles (tenant_id, company_name, legal_form, industry, founded_year, business_model, offerings, company_size, employee_count, annual_revenue, headquarters_country, headquarters_city, has_international_locations, international_countries, target_markets, primary_jurisdiction, is_data_controller, is_data_processor, uses_ai, ai_use_cases, dpo_name, dpo_email, legal_contact_name, legal_contact_email, machine_builder, is_complete, repos, document_sources, processing_systems, ai_systems, technical_contacts, subject_to_nis2, subject_to_ai_act, subject_to_iso27001, supervisory_authority, review_cycle_months) VALUES (:tid, :company_name, :legal_form, :industry, :founded_year, :business_model, :offerings::jsonb, :company_size, :employee_count, :annual_revenue, :hq_country, :hq_city, :has_intl, :intl_countries::jsonb, :target_markets::jsonb, :jurisdiction, :is_controller, :is_processor, :uses_ai, :ai_use_cases::jsonb, :dpo_name, :dpo_email, :legal_name, :legal_email, :machine_builder::jsonb, :is_complete, :repos::jsonb, :document_sources::jsonb, :processing_systems::jsonb, :ai_systems::jsonb, :technical_contacts::jsonb, :subject_to_nis2, :subject_to_ai_act, :subject_to_iso27001, :supervisory_authority, :review_cycle_months) ON CONFLICT (tenant_id) DO UPDATE SET company_name = EXCLUDED.company_name, legal_form = EXCLUDED.legal_form, industry = EXCLUDED.industry, founded_year = EXCLUDED.founded_year, business_model = EXCLUDED.business_model, offerings = EXCLUDED.offerings, company_size = EXCLUDED.company_size, employee_count = EXCLUDED.employee_count, annual_revenue = EXCLUDED.annual_revenue, headquarters_country = EXCLUDED.headquarters_country, headquarters_city = EXCLUDED.headquarters_city, has_international_locations = EXCLUDED.has_international_locations, international_countries = EXCLUDED.international_countries, target_markets = EXCLUDED.target_markets, primary_jurisdiction = EXCLUDED.primary_jurisdiction, is_data_controller = EXCLUDED.is_data_controller, is_data_processor = EXCLUDED.is_data_processor, uses_ai = EXCLUDED.uses_ai, ai_use_cases = EXCLUDED.ai_use_cases, dpo_name = EXCLUDED.dpo_name, dpo_email = EXCLUDED.dpo_email, legal_contact_name = EXCLUDED.legal_contact_name, legal_contact_email = EXCLUDED.legal_contact_email, machine_builder = EXCLUDED.machine_builder, is_complete = EXCLUDED.is_complete, repos = EXCLUDED.repos, document_sources = EXCLUDED.document_sources, processing_systems = EXCLUDED.processing_systems, ai_systems = EXCLUDED.ai_systems, technical_contacts = EXCLUDED.technical_contacts, subject_to_nis2 = EXCLUDED.subject_to_nis2, subject_to_ai_act = EXCLUDED.subject_to_ai_act, subject_to_iso27001 = EXCLUDED.subject_to_iso27001, supervisory_authority = EXCLUDED.supervisory_authority, review_cycle_months = EXCLUDED.review_cycle_months, updated_at = NOW() {completed_at_clause}""", { "tid": tid, "company_name": profile.company_name, "legal_form": profile.legal_form, "industry": profile.industry, "founded_year": profile.founded_year, "business_model": profile.business_model, "offerings": json.dumps(profile.offerings), "company_size": profile.company_size, "employee_count": profile.employee_count, "annual_revenue": profile.annual_revenue, "hq_country": profile.headquarters_country, "hq_city": profile.headquarters_city, "has_intl": profile.has_international_locations, "intl_countries": json.dumps(profile.international_countries), "target_markets": json.dumps(profile.target_markets), "jurisdiction": profile.primary_jurisdiction, "is_controller": profile.is_data_controller, "is_processor": profile.is_data_processor, "uses_ai": profile.uses_ai, "ai_use_cases": json.dumps(profile.ai_use_cases), "dpo_name": profile.dpo_name, "dpo_email": profile.dpo_email, "legal_name": profile.legal_contact_name, "legal_email": profile.legal_contact_email, "machine_builder": json.dumps(profile.machine_builder) if profile.machine_builder else None, "is_complete": profile.is_complete, "repos": json.dumps(profile.repos), "document_sources": json.dumps(profile.document_sources), "processing_systems": json.dumps(profile.processing_systems), "ai_systems": json.dumps(profile.ai_systems), "technical_contacts": json.dumps(profile.technical_contacts), "subject_to_nis2": profile.subject_to_nis2, "subject_to_ai_act": profile.subject_to_ai_act, "subject_to_iso27001": profile.subject_to_iso27001, "supervisory_authority": profile.supervisory_authority, "review_cycle_months": profile.review_cycle_months, }, ) # Audit log log_audit(db, tid, action, profile.model_dump(), None) db.commit() # Fetch and return result = db.execute( f"SELECT {_BASE_COLUMNS} FROM compliance_company_profiles WHERE tenant_id = :tid", {"tid": tid}, ) row = result.fetchone() return row_to_response(row) except Exception as e: db.rollback() logger.error(f"Failed to upsert company profile: {e}") raise HTTPException(status_code=500, detail="Failed to save company profile") finally: db.close() @router.delete("", status_code=200) async def delete_company_profile( tenant_id: str = "default", x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"), ): """Delete company profile for a tenant (DSGVO Recht auf Loeschung, Art. 17).""" tid = x_tenant_id or tenant_id db = SessionLocal() try: existing = db.execute( "SELECT id FROM compliance_company_profiles WHERE tenant_id = :tid", {"tid": tid}, ).fetchone() if not existing: raise HTTPException(status_code=404, detail="Company profile not found") db.execute( "DELETE FROM compliance_company_profiles WHERE tenant_id = :tid", {"tid": tid}, ) log_audit(db, tid, "delete", None, None) db.commit() return {"success": True, "message": "Company profile deleted"} except HTTPException: raise except Exception as e: db.rollback() logger.error(f"Failed to delete company profile: {e}") raise HTTPException(status_code=500, detail="Failed to delete company profile") finally: db.close() @router.get("/template-context") async def get_template_context( tenant_id: str = "default", x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"), ): """Return flat dict for Jinja2 template substitution in document generation.""" tid = x_tenant_id or tenant_id db = SessionLocal() try: result = db.execute( f"SELECT {_BASE_COLUMNS} FROM compliance_company_profiles WHERE tenant_id = :tid", {"tid": tid}, ) row = result.fetchone() if not row: raise HTTPException(status_code=404, detail="Company profile not found — fill Stammdaten first") resp = row_to_response(row) # Build flat context dict for templates ctx = { "company_name": resp.company_name, "legal_form": resp.legal_form, "industry": resp.industry, "business_model": resp.business_model, "company_size": resp.company_size, "employee_count": resp.employee_count, "headquarters_country": resp.headquarters_country, "headquarters_city": resp.headquarters_city, "primary_jurisdiction": resp.primary_jurisdiction, "is_data_controller": resp.is_data_controller, "is_data_processor": resp.is_data_processor, "uses_ai": resp.uses_ai, "dpo_name": resp.dpo_name or "", "dpo_email": resp.dpo_email or "", "legal_contact_name": resp.legal_contact_name or "", "legal_contact_email": resp.legal_contact_email or "", "supervisory_authority": resp.supervisory_authority or "", "review_cycle_months": resp.review_cycle_months, "subject_to_nis2": resp.subject_to_nis2, "subject_to_ai_act": resp.subject_to_ai_act, "subject_to_iso27001": resp.subject_to_iso27001, # Lists as-is for iteration in templates "offerings": resp.offerings, "target_markets": resp.target_markets, "international_countries": resp.international_countries, "ai_use_cases": resp.ai_use_cases, "repos": resp.repos, "document_sources": resp.document_sources, "processing_systems": resp.processing_systems, "ai_systems": resp.ai_systems, "technical_contacts": resp.technical_contacts, # Derived helper values "has_ai_systems": len(resp.ai_systems) > 0, "processing_system_count": len(resp.processing_systems), "ai_system_count": len(resp.ai_systems), "is_complete": resp.is_complete, } return ctx finally: db.close() @router.get("/audit", response_model=AuditListResponse) async def get_audit_log( tenant_id: str = "default", x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"), ): """Get audit log for company profile changes.""" tid = x_tenant_id or tenant_id db = SessionLocal() try: result = db.execute( """SELECT id, action, changed_fields, changed_by, created_at FROM compliance_company_profile_audit WHERE tenant_id = :tid ORDER BY created_at DESC LIMIT 100""", {"tid": tid}, ) rows = result.fetchall() entries = [ AuditEntryResponse( id=str(r[0]), action=r[1], changed_fields=r[2] if isinstance(r[2], dict) else None, changed_by=r[3], created_at=str(r[4]), ) for r in rows ] return AuditListResponse(entries=entries, total=len(entries)) finally: db.close()