refactor(backend/api): extract CompanyProfileService (Step 4 — file 4 of 18)

compliance/api/company_profile_routes.py (640 LOC) -> 154 LOC thin routes.
Unusual for this repo: persistence uses raw SQL via sqlalchemy.text()
because the underlying compliance_company_profiles table has ~45 columns
with complex jsonb coercion and there is no SQLAlchemy model for it.

New files:
  compliance/schemas/company_profile.py         (127) — 4 request/response models
  compliance/services/company_profile_service.py (340) — Service class + row_to_response + log_audit
  compliance/services/_company_profile_sql.py   (139) — 70-line INSERT/UPDATE statements
                                                         separated for readability

Minor behavioral improvement: the handlers now use Depends(get_db) for
session management instead of the bespoke `db = SessionLocal(); try: ...
finally: db.close()` pattern. This makes the routes consistent with
every other refactored service, fixes the broken-ness under test
dependency_overrides, and removes 6 duplicate try/finally blocks.

Legacy exports preserved: CompanyProfileRequest, CompanyProfileResponse,
AuditEntryResponse, AuditListResponse, row_to_response, and log_audit are
re-exported from compliance.api.company_profile_routes so that the two
existing test files
(tests/test_company_profile_routes.py, tests/test_company_profile_extend.py)
keep importing from the same path.

Pre-existing broken tests noted: 6 tests in those files feed a 40-tuple
row into row_to_response, but _BASE_COLUMNS_LIST has 46 columns (has had
since the Phase 2 Stammdaten extension). These tests fail on main too
(verified via `git stash` round-trip). Not fixed in this commit — they
require a rewrite of the test's _make_row helper, which is out of scope
for a pure structural refactor. Flagged for follow-up.

Verified:
  - 173/173 pytest compliance/tests/ tests/contracts/ pass
  - OpenAPI 360/484 unchanged
  - mypy compliance/ -> Success on 127 source files
  - company_profile_routes.py 640 -> 154 LOC
  - All new files under soft 300 target except service (340, under hard 500)
  - Hard-cap violations: 15 -> 14

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sharang Parnerkar
2026-04-07 19:47:29 +02:00
parent d571412657
commit f39c7ca40c
6 changed files with 690 additions and 560 deletions

View File

@@ -2,265 +2,52 @@
FastAPI routes for Company Profile CRUD with audit logging.
Endpoints:
- GET /v1/company-profile: Get company profile for a tenant (+project)
- POST /v1/company-profile: Create or update company profile
- DELETE /v1/company-profile: Delete company profile
- GET /v1/company-profile/audit: Get audit log for a tenant
- GET /v1/company-profile/template-context: Flat dict for template substitution
- GET /v1/company-profile - Get company profile
- POST /v1/company-profile - Create or update (upsert)
- PATCH /v1/company-profile - Partial update
- DELETE /v1/company-profile - Delete (DSGVO Art. 17)
- GET /v1/company-profile/audit - Audit log for changes
- GET /v1/company-profile/template-context - Flat dict for Jinja2
Phase 1 Step 4 refactor: handlers delegate to CompanyProfileService.
Legacy helper + schema names are re-exported so existing test imports
(``from compliance.api.company_profile_routes import
CompanyProfileRequest, row_to_response, log_audit``) continue to work.
"""
import json
import logging
from typing import Optional
from typing import Any, Optional
from fastapi import APIRouter, HTTPException, Header, Query
from pydantic import BaseModel
from sqlalchemy import text
from fastapi import APIRouter, Depends, Header, Query
from sqlalchemy.orm import Session
from database import SessionLocal
from classroom_engine.database import get_db
from compliance.api._http_errors import translate_domain_errors
from compliance.schemas.company_profile import (
AuditEntryResponse,
AuditListResponse,
CompanyProfileRequest,
CompanyProfileResponse,
)
from compliance.services.company_profile_service import (
CompanyProfileService,
log_audit,
row_to_response,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/v1/company-profile", tags=["company-profile"])
# =============================================================================
# REQUEST/RESPONSE MODELS
# =============================================================================
class CompanyProfileRequest(BaseModel):
company_name: str = ""
legal_form: str = "GmbH"
industry: str = ""
founded_year: Optional[int] = None
business_model: str = "B2B"
offerings: list[str] = []
offering_urls: dict = {}
company_size: str = "small"
employee_count: str = "1-9"
annual_revenue: str = "< 2 Mio"
headquarters_country: str = "DE"
headquarters_country_other: str = ""
headquarters_street: str = ""
headquarters_zip: str = ""
headquarters_city: str = ""
headquarters_state: str = ""
has_international_locations: bool = False
international_countries: list[str] = []
target_markets: list[str] = ["DE"]
primary_jurisdiction: str = "DE"
is_data_controller: bool = True
is_data_processor: bool = False
uses_ai: bool = False
ai_use_cases: list[str] = []
dpo_name: Optional[str] = None
dpo_email: Optional[str] = None
legal_contact_name: Optional[str] = None
legal_contact_email: Optional[str] = None
machine_builder: Optional[dict] = None
is_complete: bool = False
# Phase 2 fields
repos: list[dict] = []
document_sources: list[dict] = []
processing_systems: list[dict] = []
ai_systems: list[dict] = []
technical_contacts: list[dict] = []
subject_to_nis2: bool = False
subject_to_ai_act: bool = False
subject_to_iso27001: bool = False
supervisory_authority: Optional[str] = None
review_cycle_months: int = 12
# Project ID (multi-project)
project_id: Optional[str] = None
def get_company_profile_service(
db: Session = Depends(get_db),
) -> CompanyProfileService:
return CompanyProfileService(db)
class CompanyProfileResponse(BaseModel):
id: str
tenant_id: str
project_id: Optional[str] = None
company_name: str
legal_form: str
industry: str
founded_year: Optional[int]
business_model: str
offerings: list[str]
offering_urls: dict = {}
company_size: str
employee_count: str
annual_revenue: str
headquarters_country: str
headquarters_country_other: str = ""
headquarters_street: str = ""
headquarters_zip: str = ""
headquarters_city: str = ""
headquarters_state: str = ""
has_international_locations: bool
international_countries: list[str]
target_markets: list[str]
primary_jurisdiction: str
is_data_controller: bool
is_data_processor: bool
uses_ai: bool
ai_use_cases: list[str]
dpo_name: Optional[str]
dpo_email: Optional[str]
legal_contact_name: Optional[str]
legal_contact_email: Optional[str]
machine_builder: Optional[dict]
is_complete: bool
completed_at: Optional[str]
created_at: str
updated_at: str
# Phase 2 fields
repos: list[dict] = []
document_sources: list[dict] = []
processing_systems: list[dict] = []
ai_systems: list[dict] = []
technical_contacts: list[dict] = []
subject_to_nis2: bool = False
subject_to_ai_act: bool = False
subject_to_iso27001: bool = False
supervisory_authority: Optional[str] = None
review_cycle_months: int = 12
class AuditEntryResponse(BaseModel):
id: str
action: str
changed_fields: Optional[dict]
changed_by: Optional[str]
created_at: str
class AuditListResponse(BaseModel):
entries: list[AuditEntryResponse]
total: int
# =============================================================================
# SQL column lists — keep in sync with SELECT/INSERT
# =============================================================================
_BASE_COLUMNS_LIST = [
"id", "tenant_id", "company_name", "legal_form", "industry", "founded_year",
"business_model", "offerings", "company_size", "employee_count", "annual_revenue",
"headquarters_country", "headquarters_city", "has_international_locations",
"international_countries", "target_markets", "primary_jurisdiction",
"is_data_controller", "is_data_processor", "uses_ai", "ai_use_cases",
"dpo_name", "dpo_email", "legal_contact_name", "legal_contact_email",
"machine_builder", "is_complete", "completed_at", "created_at", "updated_at",
"repos", "document_sources", "processing_systems", "ai_systems", "technical_contacts",
"subject_to_nis2", "subject_to_ai_act", "subject_to_iso27001",
"supervisory_authority", "review_cycle_months",
"project_id", "offering_urls",
"headquarters_country_other", "headquarters_street", "headquarters_zip", "headquarters_state",
]
_BASE_COLUMNS = ", ".join(_BASE_COLUMNS_LIST)
# Per-field defaults and type coercions for row_to_response.
_FIELD_DEFAULTS = {
"id": (None, "STR"),
"tenant_id": (None, None),
"company_name": ("", None),
"legal_form": ("GmbH", None),
"industry": ("", None),
"founded_year": (None, None),
"business_model": ("B2B", None),
"offerings": ([], list),
"offering_urls": ({}, dict),
"company_size": ("small", None),
"employee_count": ("1-9", None),
"annual_revenue": ("< 2 Mio", None),
"headquarters_country": ("DE", None),
"headquarters_country_other": ("", None),
"headquarters_street": ("", None),
"headquarters_zip": ("", None),
"headquarters_city": ("", None),
"headquarters_state": ("", None),
"has_international_locations": (False, None),
"international_countries": ([], list),
"target_markets": (["DE"], list),
"primary_jurisdiction": ("DE", None),
"is_data_controller": (True, None),
"is_data_processor": (False, None),
"uses_ai": (False, None),
"ai_use_cases": ([], list),
"dpo_name": (None, None),
"dpo_email": (None, None),
"legal_contact_name": (None, None),
"legal_contact_email": (None, None),
"machine_builder": (None, dict),
"is_complete": (False, None),
"completed_at": (None, "STR_OR_NONE"),
"created_at": (None, "STR"),
"updated_at": (None, "STR"),
"repos": ([], list),
"document_sources": ([], list),
"processing_systems": ([], list),
"ai_systems": ([], list),
"technical_contacts": ([], list),
"subject_to_nis2": (False, None),
"subject_to_ai_act": (False, None),
"subject_to_iso27001": (False, None),
"supervisory_authority": (None, None),
"review_cycle_months": (12, None),
"project_id": (None, "STR_OR_NONE"),
}
# =============================================================================
# HELPERS
# =============================================================================
def _where_clause():
"""WHERE clause matching tenant_id + project_id (handles NULL)."""
return "tenant_id = :tid AND project_id IS NOT DISTINCT FROM :pid"
def row_to_response(row) -> CompanyProfileResponse:
"""Convert a DB row to response model using zip-based column mapping."""
raw = dict(zip(_BASE_COLUMNS_LIST, row))
coerced: dict = {}
for col in _BASE_COLUMNS_LIST:
default, expected_type = _FIELD_DEFAULTS[col]
value = raw[col]
if expected_type == "STR":
coerced[col] = str(value)
elif expected_type == "STR_OR_NONE":
coerced[col] = str(value) if value else None
elif expected_type is not None:
coerced[col] = value if isinstance(value, expected_type) else default
else:
if col == "is_data_controller":
coerced[col] = value if value is not None else default
else:
coerced[col] = value or default if default is not None else value
return CompanyProfileResponse(**coerced)
def log_audit(db, tenant_id: str, action: str, changed_fields: Optional[dict], changed_by: Optional[str], project_id: Optional[str] = None):
"""Write an audit log entry."""
try:
db.execute(
text("""INSERT INTO compliance_company_profile_audit
(tenant_id, project_id, action, changed_fields, changed_by)
VALUES (:tenant_id, :project_id, :action, :fields::jsonb, :changed_by)"""),
{
"tenant_id": tenant_id,
"project_id": project_id,
"action": action,
"fields": json.dumps(changed_fields) if changed_fields else None,
"changed_by": changed_by,
},
)
except Exception as e:
logger.warning(f"Failed to write audit log: {e}")
def _resolve_ids(tenant_id: str, x_tenant_id: Optional[str], project_id: Optional[str]):
def _resolve_ids(
tenant_id: str, x_tenant_id: Optional[str], project_id: Optional[str]
) -> tuple[str, Optional[str]]:
"""Resolve tenant_id and project_id from params/headers."""
tid = x_tenant_id or tenant_id
pid = project_id if project_id and project_id != "null" else None
@@ -276,22 +63,12 @@ async def get_company_profile(
tenant_id: str = "default",
project_id: Optional[str] = Query(None),
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
):
service: CompanyProfileService = Depends(get_company_profile_service),
) -> CompanyProfileResponse:
"""Get company profile for a tenant (optionally per project)."""
tid, pid = _resolve_ids(tenant_id, x_tenant_id, project_id)
db = SessionLocal()
try:
result = db.execute(
text(f"SELECT {_BASE_COLUMNS} FROM compliance_company_profiles WHERE {_where_clause()}"),
{"tid": tid, "pid": pid},
)
row = result.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Company profile not found")
return row_to_response(row)
finally:
db.close()
with translate_domain_errors():
return service.get(tid, pid)
@router.post("", response_model=CompanyProfileResponse)
@@ -300,147 +77,12 @@ async def upsert_company_profile(
tenant_id: str = "default",
project_id: Optional[str] = Query(None),
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
):
service: CompanyProfileService = Depends(get_company_profile_service),
) -> CompanyProfileResponse:
"""Create or update company profile (upsert)."""
tid, pid = _resolve_ids(tenant_id, x_tenant_id, project_id or profile.project_id)
db = SessionLocal()
try:
# Check if profile exists for this tenant+project
existing = db.execute(
text(f"SELECT id FROM compliance_company_profiles WHERE {_where_clause()}"),
{"tid": tid, "pid": pid},
).fetchone()
action = "update" if existing else "create"
completed_at_sql = "NOW()" if profile.is_complete else "NULL"
params = {
"tid": tid,
"pid": pid,
"company_name": profile.company_name,
"legal_form": profile.legal_form,
"industry": profile.industry,
"founded_year": profile.founded_year,
"business_model": profile.business_model,
"offerings": json.dumps(profile.offerings),
"offering_urls": json.dumps(profile.offering_urls),
"company_size": profile.company_size,
"employee_count": profile.employee_count,
"annual_revenue": profile.annual_revenue,
"hq_country": profile.headquarters_country,
"hq_country_other": profile.headquarters_country_other,
"hq_street": profile.headquarters_street,
"hq_zip": profile.headquarters_zip,
"hq_city": profile.headquarters_city,
"hq_state": profile.headquarters_state,
"has_intl": profile.has_international_locations,
"intl_countries": json.dumps(profile.international_countries),
"target_markets": json.dumps(profile.target_markets),
"jurisdiction": profile.primary_jurisdiction,
"is_controller": profile.is_data_controller,
"is_processor": profile.is_data_processor,
"uses_ai": profile.uses_ai,
"ai_use_cases": json.dumps(profile.ai_use_cases),
"dpo_name": profile.dpo_name,
"dpo_email": profile.dpo_email,
"legal_name": profile.legal_contact_name,
"legal_email": profile.legal_contact_email,
"machine_builder": json.dumps(profile.machine_builder) if profile.machine_builder else None,
"is_complete": profile.is_complete,
"repos": json.dumps(profile.repos),
"document_sources": json.dumps(profile.document_sources),
"processing_systems": json.dumps(profile.processing_systems),
"ai_systems": json.dumps(profile.ai_systems),
"technical_contacts": json.dumps(profile.technical_contacts),
"subject_to_nis2": profile.subject_to_nis2,
"subject_to_ai_act": profile.subject_to_ai_act,
"subject_to_iso27001": profile.subject_to_iso27001,
"supervisory_authority": profile.supervisory_authority,
"review_cycle_months": profile.review_cycle_months,
}
if existing:
db.execute(
text(f"""UPDATE compliance_company_profiles SET
company_name = :company_name, legal_form = :legal_form,
industry = :industry, founded_year = :founded_year,
business_model = :business_model, offerings = :offerings::jsonb,
offering_urls = :offering_urls::jsonb,
company_size = :company_size, employee_count = :employee_count,
annual_revenue = :annual_revenue,
headquarters_country = :hq_country, headquarters_country_other = :hq_country_other,
headquarters_street = :hq_street, headquarters_zip = :hq_zip,
headquarters_city = :hq_city, headquarters_state = :hq_state,
has_international_locations = :has_intl,
international_countries = :intl_countries::jsonb,
target_markets = :target_markets::jsonb, primary_jurisdiction = :jurisdiction,
is_data_controller = :is_controller, is_data_processor = :is_processor,
uses_ai = :uses_ai, ai_use_cases = :ai_use_cases::jsonb,
dpo_name = :dpo_name, dpo_email = :dpo_email,
legal_contact_name = :legal_name, legal_contact_email = :legal_email,
machine_builder = :machine_builder::jsonb, is_complete = :is_complete,
repos = :repos::jsonb, document_sources = :document_sources::jsonb,
processing_systems = :processing_systems::jsonb,
ai_systems = :ai_systems::jsonb, technical_contacts = :technical_contacts::jsonb,
subject_to_nis2 = :subject_to_nis2, subject_to_ai_act = :subject_to_ai_act,
subject_to_iso27001 = :subject_to_iso27001,
supervisory_authority = :supervisory_authority,
review_cycle_months = :review_cycle_months,
updated_at = NOW(), completed_at = {completed_at_sql}
WHERE {_where_clause()}"""),
params,
)
else:
db.execute(
text(f"""INSERT INTO compliance_company_profiles
(tenant_id, project_id, company_name, legal_form, industry, founded_year,
business_model, offerings, offering_urls,
company_size, employee_count, annual_revenue,
headquarters_country, headquarters_country_other,
headquarters_street, headquarters_zip, headquarters_city, headquarters_state,
has_international_locations, international_countries,
target_markets, primary_jurisdiction,
is_data_controller, is_data_processor, uses_ai, ai_use_cases,
dpo_name, dpo_email, legal_contact_name, legal_contact_email,
machine_builder, is_complete, completed_at,
repos, document_sources, processing_systems, ai_systems, technical_contacts,
subject_to_nis2, subject_to_ai_act, subject_to_iso27001,
supervisory_authority, review_cycle_months)
VALUES (:tid, :pid, :company_name, :legal_form, :industry, :founded_year,
:business_model, :offerings::jsonb, :offering_urls::jsonb,
:company_size, :employee_count, :annual_revenue,
:hq_country, :hq_country_other,
:hq_street, :hq_zip, :hq_city, :hq_state,
:has_intl, :intl_countries::jsonb,
:target_markets::jsonb, :jurisdiction,
:is_controller, :is_processor, :uses_ai, :ai_use_cases::jsonb,
:dpo_name, :dpo_email, :legal_name, :legal_email,
:machine_builder::jsonb, :is_complete, {completed_at_sql},
:repos::jsonb, :document_sources::jsonb, :processing_systems::jsonb,
:ai_systems::jsonb, :technical_contacts::jsonb,
:subject_to_nis2, :subject_to_ai_act, :subject_to_iso27001,
:supervisory_authority, :review_cycle_months)"""),
params,
)
log_audit(db, tid, action, profile.model_dump(), None, pid)
db.commit()
# Fetch and return
result = db.execute(
text(f"SELECT {_BASE_COLUMNS} FROM compliance_company_profiles WHERE {_where_clause()}"),
{"tid": tid, "pid": pid},
)
row = result.fetchone()
return row_to_response(row)
except HTTPException:
raise
except Exception as e:
db.rollback()
logger.error(f"Failed to upsert company profile: {e}")
raise HTTPException(status_code=500, detail="Failed to save company profile")
finally:
db.close()
with translate_domain_errors():
return service.upsert(tid, pid, profile)
@router.delete("", status_code=200)
@@ -448,36 +90,12 @@ async def delete_company_profile(
tenant_id: str = "default",
project_id: Optional[str] = Query(None),
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
):
service: CompanyProfileService = Depends(get_company_profile_service),
) -> dict[str, Any]:
"""Delete company profile for a tenant (DSGVO Recht auf Loeschung, Art. 17)."""
tid, pid = _resolve_ids(tenant_id, x_tenant_id, project_id)
db = SessionLocal()
try:
existing = db.execute(
text(f"SELECT id FROM compliance_company_profiles WHERE {_where_clause()}"),
{"tid": tid, "pid": pid},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail="Company profile not found")
db.execute(
text(f"DELETE FROM compliance_company_profiles WHERE {_where_clause()}"),
{"tid": tid, "pid": pid},
)
log_audit(db, tid, "delete", None, None, pid)
db.commit()
return {"success": True, "message": "Company profile deleted"}
except HTTPException:
raise
except Exception as e:
db.rollback()
logger.error(f"Failed to delete company profile: {e}")
raise HTTPException(status_code=500, detail="Failed to delete company profile")
finally:
db.close()
with translate_domain_errors():
return service.delete(tid, pid)
@router.get("/template-context")
@@ -485,59 +103,12 @@ async def get_template_context(
tenant_id: str = "default",
project_id: Optional[str] = Query(None),
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
):
service: CompanyProfileService = Depends(get_company_profile_service),
) -> dict[str, Any]:
"""Return flat dict for Jinja2 template substitution in document generation."""
tid, pid = _resolve_ids(tenant_id, x_tenant_id, project_id)
db = SessionLocal()
try:
result = db.execute(
text(f"SELECT {_BASE_COLUMNS} FROM compliance_company_profiles WHERE {_where_clause()}"),
{"tid": tid, "pid": pid},
)
row = result.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Company profile not found — fill Stammdaten first")
resp = row_to_response(row)
ctx = {
"company_name": resp.company_name,
"legal_form": resp.legal_form,
"industry": resp.industry,
"business_model": resp.business_model,
"company_size": resp.company_size,
"employee_count": resp.employee_count,
"headquarters_country": resp.headquarters_country,
"headquarters_city": resp.headquarters_city,
"primary_jurisdiction": resp.primary_jurisdiction,
"is_data_controller": resp.is_data_controller,
"is_data_processor": resp.is_data_processor,
"uses_ai": resp.uses_ai,
"dpo_name": resp.dpo_name or "",
"dpo_email": resp.dpo_email or "",
"legal_contact_name": resp.legal_contact_name or "",
"legal_contact_email": resp.legal_contact_email or "",
"supervisory_authority": resp.supervisory_authority or "",
"review_cycle_months": resp.review_cycle_months,
"subject_to_nis2": resp.subject_to_nis2,
"subject_to_ai_act": resp.subject_to_ai_act,
"subject_to_iso27001": resp.subject_to_iso27001,
"offerings": resp.offerings,
"target_markets": resp.target_markets,
"international_countries": resp.international_countries,
"ai_use_cases": resp.ai_use_cases,
"repos": resp.repos,
"document_sources": resp.document_sources,
"processing_systems": resp.processing_systems,
"ai_systems": resp.ai_systems,
"technical_contacts": resp.technical_contacts,
"has_ai_systems": len(resp.ai_systems) > 0,
"processing_system_count": len(resp.processing_systems),
"ai_system_count": len(resp.ai_systems),
"is_complete": resp.is_complete,
}
return ctx
finally:
db.close()
with translate_domain_errors():
return service.template_context(tid, pid)
@router.get("/audit", response_model=AuditListResponse)
@@ -545,96 +116,39 @@ async def get_audit_log(
tenant_id: str = "default",
project_id: Optional[str] = Query(None),
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
):
service: CompanyProfileService = Depends(get_company_profile_service),
) -> AuditListResponse:
"""Get audit log for company profile changes."""
tid, pid = _resolve_ids(tenant_id, x_tenant_id, project_id)
db = SessionLocal()
try:
result = db.execute(
text("""SELECT id, action, changed_fields, changed_by, created_at
FROM compliance_company_profile_audit
WHERE tenant_id = :tid AND project_id IS NOT DISTINCT FROM :pid
ORDER BY created_at DESC
LIMIT 100"""),
{"tid": tid, "pid": pid},
)
rows = result.fetchall()
entries = [
AuditEntryResponse(
id=str(r[0]),
action=r[1],
changed_fields=r[2] if isinstance(r[2], dict) else None,
changed_by=r[3],
created_at=str(r[4]),
)
for r in rows
]
return AuditListResponse(entries=entries, total=len(entries))
finally:
db.close()
with translate_domain_errors():
return service.audit_log(tid, pid)
@router.patch("", response_model=CompanyProfileResponse)
async def patch_company_profile(
updates: dict,
updates: dict[str, Any],
tenant_id: str = "default",
project_id: Optional[str] = Query(None),
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
):
service: CompanyProfileService = Depends(get_company_profile_service),
) -> CompanyProfileResponse:
"""Partial update for company profile."""
tid, pid = _resolve_ids(tenant_id, x_tenant_id, project_id or updates.get("project_id"))
db = SessionLocal()
try:
existing = db.execute(
text(f"SELECT id FROM compliance_company_profiles WHERE {_where_clause()}"),
{"tid": tid, "pid": pid},
).fetchone()
with translate_domain_errors():
return service.patch(tid, pid, updates)
if not existing:
raise HTTPException(status_code=404, detail="Company profile not found")
# Build SET clause from provided fields
allowed_fields = set(_BASE_COLUMNS_LIST) - {"id", "tenant_id", "project_id", "created_at", "updated_at", "completed_at"}
set_parts = []
params = {"tid": tid, "pid": pid}
jsonb_fields = {"offerings", "offering_urls", "international_countries", "target_markets",
"ai_use_cases", "machine_builder", "repos", "document_sources",
"processing_systems", "ai_systems", "technical_contacts"}
# ----------------------------------------------------------------------------
# Legacy re-exports for tests that imported directly from this module.
# Do not add new imports to this list — import from the new home instead.
# ----------------------------------------------------------------------------
for key, value in updates.items():
if key in allowed_fields:
param_name = f"p_{key}"
if key in jsonb_fields:
set_parts.append(f"{key} = :{param_name}::jsonb")
params[param_name] = json.dumps(value) if value is not None else None
else:
set_parts.append(f"{key} = :{param_name}")
params[param_name] = value
if not set_parts:
raise HTTPException(status_code=400, detail="No valid fields to update")
set_parts.append("updated_at = NOW()")
db.execute(
text(f"UPDATE compliance_company_profiles SET {', '.join(set_parts)} WHERE {_where_clause()}"),
params,
)
log_audit(db, tid, "patch", updates, None, pid)
db.commit()
result = db.execute(
text(f"SELECT {_BASE_COLUMNS} FROM compliance_company_profiles WHERE {_where_clause()}"),
{"tid": tid, "pid": pid},
)
row = result.fetchone()
return row_to_response(row)
except HTTPException:
raise
except Exception as e:
db.rollback()
logger.error(f"Failed to patch company profile: {e}")
raise HTTPException(status_code=500, detail="Failed to patch company profile")
finally:
db.close()
__all__ = [
"router",
"CompanyProfileRequest",
"CompanyProfileResponse",
"AuditEntryResponse",
"AuditListResponse",
"row_to_response",
"log_audit",
]