# mypy: disable-error-code="arg-type,assignment,union-attr,no-any-return,attr-defined,index,call-overload,type-arg,var-annotated,misc,call-arg,return-value" """ Vendor compliance sub-entities — Contracts CRUD + row converters for contracts, findings, and control instances. Phase 1 Step 4: extracted from ``compliance.api.vendor_compliance_routes``. Shares helpers with ``compliance.services.vendor_compliance_service``. """ import json import uuid from datetime import datetime, timezone from typing import Any, Optional from sqlalchemy import text from sqlalchemy.orm import Session from compliance.domain import NotFoundError from compliance.services.vendor_compliance_service import ( DEFAULT_TENANT_ID, _get, _ok, _parse_json, _to_camel, _to_snake, _ts, ) # ============================================================================ # Row -> Response converters (shared with extra service) # ============================================================================ def _contract_to_response(row: Any) -> dict: return _to_camel({ "id": str(row["id"]), "tenant_id": row["tenant_id"], "vendor_id": str(row["vendor_id"]), "file_name": _get(row, "file_name", ""), "original_name": _get(row, "original_name", ""), "mime_type": _get(row, "mime_type", ""), "file_size": _get(row, "file_size", 0), "storage_path": _get(row, "storage_path", ""), "document_type": _get(row, "document_type", "AVV"), "version": _get(row, "version", 1), "previous_version_id": ( str(_get(row, "previous_version_id")) if _get(row, "previous_version_id") else None ), "parties": _parse_json(_get(row, "parties"), []), "effective_date": _ts(_get(row, "effective_date")), "expiration_date": _ts(_get(row, "expiration_date")), "auto_renewal": _get(row, "auto_renewal", False), "renewal_notice_period": _get(row, "renewal_notice_period", ""), "termination_notice_period": _get( row, "termination_notice_period", "", ), "review_status": _get(row, "review_status", "PENDING"), "review_completed_at": _ts(_get(row, "review_completed_at")), "compliance_score": _get(row, "compliance_score"), "status": _get(row, "status", "DRAFT"), "extracted_text": _get(row, "extracted_text", ""), "page_count": _get(row, "page_count", 0), "created_at": _ts(row["created_at"]), "updated_at": _ts(row["updated_at"]), "created_by": _get(row, "created_by", "system"), }) def _finding_to_response(row: Any) -> dict: return _to_camel({ "id": str(row["id"]), "tenant_id": row["tenant_id"], "vendor_id": str(row["vendor_id"]), "contract_id": ( str(_get(row, "contract_id")) if _get(row, "contract_id") else None ), "finding_type": _get(row, "finding_type", "UNKNOWN"), "category": _get(row, "category", ""), "severity": _get(row, "severity", "MEDIUM"), "title": _get(row, "title", ""), "description": _get(row, "description", ""), "recommendation": _get(row, "recommendation", ""), "citations": _parse_json(_get(row, "citations"), []), "status": _get(row, "status", "OPEN"), "assignee": _get(row, "assignee", ""), "due_date": _ts(_get(row, "due_date")), "resolution": _get(row, "resolution", ""), "resolved_at": _ts(_get(row, "resolved_at")), "resolved_by": _get(row, "resolved_by", ""), "created_at": _ts(row["created_at"]), "updated_at": _ts(row["updated_at"]), "created_by": _get(row, "created_by", "system"), }) def _control_instance_to_response(row: Any) -> dict: return _to_camel({ "id": str(row["id"]), "tenant_id": row["tenant_id"], "vendor_id": str(row["vendor_id"]), "control_id": _get(row, "control_id", ""), "control_domain": _get(row, "control_domain", ""), "status": _get(row, "status", "PLANNED"), "evidence_ids": _parse_json(_get(row, "evidence_ids"), []), "notes": _get(row, "notes", ""), "last_assessed_at": _ts(_get(row, "last_assessed_at")), "last_assessed_by": _get(row, "last_assessed_by", ""), "next_assessment_date": _ts(_get(row, "next_assessment_date")), "created_at": _ts(row["created_at"]), "updated_at": _ts(row["updated_at"]), "created_by": _get(row, "created_by", "system"), }) # ============================================================================ # ContractService # ============================================================================ class ContractService: """Vendor contracts CRUD.""" def __init__(self, db: Session) -> None: self._db = db def list_contracts( self, tenant_id: Optional[str] = None, vendor_id: Optional[str] = None, status: Optional[str] = None, skip: int = 0, limit: int = 100, ) -> dict: tid = tenant_id or DEFAULT_TENANT_ID where = ["tenant_id = :tid"] params: dict = {"tid": tid} if vendor_id: where.append("vendor_id = :vendor_id") params["vendor_id"] = vendor_id if status: where.append("status = :status") params["status"] = status where_clause = " AND ".join(where) params["lim"] = limit params["off"] = skip rows = self._db.execute(text(f""" SELECT * FROM vendor_contracts WHERE {where_clause} ORDER BY created_at DESC LIMIT :lim OFFSET :off """), params).fetchall() return _ok([_contract_to_response(r) for r in rows]) def get_contract(self, contract_id: str) -> dict: row = self._db.execute( text("SELECT * FROM vendor_contracts WHERE id = :id"), {"id": contract_id}, ).fetchone() if not row: raise NotFoundError("Contract not found") return _ok(_contract_to_response(row)) def create_contract(self, body: dict) -> dict: data = _to_snake(body) cid = str(uuid.uuid4()) tid = data.get("tenant_id", DEFAULT_TENANT_ID) now = datetime.now(timezone.utc).isoformat() self._db.execute(text(""" INSERT INTO vendor_contracts ( id, tenant_id, vendor_id, file_name, original_name, mime_type, file_size, storage_path, document_type, version, previous_version_id, parties, effective_date, expiration_date, auto_renewal, renewal_notice_period, termination_notice_period, review_status, status, compliance_score, extracted_text, page_count, created_at, updated_at, created_by ) VALUES ( :id, :tenant_id, :vendor_id, :file_name, :original_name, :mime_type, :file_size, :storage_path, :document_type, :version, :previous_version_id, CAST(:parties AS jsonb), :effective_date, :expiration_date, :auto_renewal, :renewal_notice_period, :termination_notice_period, :review_status, :status, :compliance_score, :extracted_text, :page_count, :created_at, :updated_at, :created_by ) """), { "id": cid, "tenant_id": tid, "vendor_id": data.get("vendor_id", ""), "file_name": data.get("file_name", ""), "original_name": data.get("original_name", ""), "mime_type": data.get("mime_type", ""), "file_size": data.get("file_size", 0), "storage_path": data.get("storage_path", ""), "document_type": data.get("document_type", "AVV"), "version": data.get("version", 1), "previous_version_id": data.get("previous_version_id"), "parties": json.dumps(data.get("parties", [])), "effective_date": data.get("effective_date"), "expiration_date": data.get("expiration_date"), "auto_renewal": data.get("auto_renewal", False), "renewal_notice_period": data.get("renewal_notice_period", ""), "termination_notice_period": data.get( "termination_notice_period", "", ), "review_status": data.get("review_status", "PENDING"), "status": data.get("status", "DRAFT"), "compliance_score": data.get("compliance_score"), "extracted_text": data.get("extracted_text", ""), "page_count": data.get("page_count", 0), "created_at": now, "updated_at": now, "created_by": data.get("created_by", "system"), }) self._db.commit() row = self._db.execute( text("SELECT * FROM vendor_contracts WHERE id = :id"), {"id": cid}, ).fetchone() return _ok(_contract_to_response(row)) def update_contract(self, contract_id: str, body: dict) -> dict: existing = self._db.execute( text("SELECT id FROM vendor_contracts WHERE id = :id"), {"id": contract_id}, ).fetchone() if not existing: raise NotFoundError("Contract not found") data = _to_snake(body) now = datetime.now(timezone.utc).isoformat() allowed = [ "vendor_id", "file_name", "original_name", "mime_type", "file_size", "storage_path", "document_type", "version", "previous_version_id", "effective_date", "expiration_date", "auto_renewal", "renewal_notice_period", "termination_notice_period", "review_status", "review_completed_at", "compliance_score", "status", "extracted_text", "page_count", ] jsonb_fields = ["parties"] sets = ["updated_at = :updated_at"] params: dict = {"id": contract_id, "updated_at": now} for col in allowed: if col in data: sets.append(f"{col} = :{col}") params[col] = data[col] for col in jsonb_fields: if col in data: sets.append(f"{col} = CAST(:{col} AS jsonb)") params[col] = json.dumps(data[col]) self._db.execute( text( f"UPDATE vendor_contracts SET {', '.join(sets)} WHERE id = :id", ), params, ) self._db.commit() row = self._db.execute( text("SELECT * FROM vendor_contracts WHERE id = :id"), {"id": contract_id}, ).fetchone() return _ok(_contract_to_response(row)) def delete_contract(self, contract_id: str) -> dict: result = self._db.execute( text("DELETE FROM vendor_contracts WHERE id = :id"), {"id": contract_id}, ) self._db.commit() if result.rowcount == 0: raise NotFoundError("Contract not found") return _ok({"deleted": True})