refactor(backend/api): extract LegalDocumentConsentService (Step 4 — file 12 of 18)
Extract consent, audit log, cookie category, and consent stats endpoints from legal_document_routes into LegalDocumentConsentService. The route file is now a thin handler layer delegating to LegalDocumentService and LegalDocumentConsentService with translate_domain_errors(). Legacy helpers (_doc_to_response, _version_to_response, _transition, _log_approval) and schemas are re-exported for existing tests. Two transition tests updated to expect domain errors instead of HTTPException. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
File diff suppressed because it is too large
Load Diff
121
backend-compliance/compliance/schemas/legal_document.py
Normal file
121
backend-compliance/compliance/schemas/legal_document.py
Normal file
@@ -0,0 +1,121 @@
|
||||
"""
|
||||
Legal document schemas — Rechtliche Texte with versioning + approval.
|
||||
|
||||
Phase 1 Step 4: extracted from ``compliance.api.legal_document_routes``.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class DocumentCreate(BaseModel):
|
||||
type: str
|
||||
name: str
|
||||
description: Optional[str] = None
|
||||
mandatory: bool = False
|
||||
tenant_id: Optional[str] = None
|
||||
|
||||
|
||||
class DocumentResponse(BaseModel):
|
||||
id: str
|
||||
tenant_id: Optional[str]
|
||||
type: str
|
||||
name: str
|
||||
description: Optional[str]
|
||||
mandatory: bool
|
||||
created_at: datetime
|
||||
updated_at: Optional[datetime]
|
||||
|
||||
|
||||
class VersionCreate(BaseModel):
|
||||
document_id: str
|
||||
version: str
|
||||
language: str = "de"
|
||||
title: str
|
||||
content: str
|
||||
summary: Optional[str] = None
|
||||
created_by: Optional[str] = None
|
||||
|
||||
|
||||
class VersionUpdate(BaseModel):
|
||||
title: Optional[str] = None
|
||||
content: Optional[str] = None
|
||||
summary: Optional[str] = None
|
||||
version: Optional[str] = None
|
||||
language: Optional[str] = None
|
||||
|
||||
|
||||
class VersionResponse(BaseModel):
|
||||
id: str
|
||||
document_id: str
|
||||
version: str
|
||||
language: str
|
||||
title: str
|
||||
content: str
|
||||
summary: Optional[str]
|
||||
status: str
|
||||
created_by: Optional[str]
|
||||
approved_by: Optional[str]
|
||||
approved_at: Optional[datetime]
|
||||
rejection_reason: Optional[str]
|
||||
created_at: datetime
|
||||
updated_at: Optional[datetime]
|
||||
|
||||
|
||||
class ApprovalHistoryEntry(BaseModel):
|
||||
id: str
|
||||
version_id: str
|
||||
action: str
|
||||
approver: Optional[str]
|
||||
comment: Optional[str]
|
||||
created_at: datetime
|
||||
|
||||
|
||||
class ActionRequest(BaseModel):
|
||||
approver: Optional[str] = None
|
||||
comment: Optional[str] = None
|
||||
|
||||
|
||||
class UserConsentCreate(BaseModel):
|
||||
user_id: str
|
||||
document_id: str
|
||||
document_version_id: Optional[str] = None
|
||||
document_type: str
|
||||
consented: bool = True
|
||||
ip_address: Optional[str] = None
|
||||
user_agent: Optional[str] = None
|
||||
|
||||
|
||||
class CookieCategoryCreate(BaseModel):
|
||||
name_de: str
|
||||
name_en: Optional[str] = None
|
||||
description_de: Optional[str] = None
|
||||
description_en: Optional[str] = None
|
||||
is_required: bool = False
|
||||
sort_order: int = 0
|
||||
|
||||
|
||||
class CookieCategoryUpdate(BaseModel):
|
||||
name_de: Optional[str] = None
|
||||
name_en: Optional[str] = None
|
||||
description_de: Optional[str] = None
|
||||
description_en: Optional[str] = None
|
||||
is_required: Optional[bool] = None
|
||||
sort_order: Optional[int] = None
|
||||
is_active: Optional[bool] = None
|
||||
|
||||
|
||||
__all__ = [
|
||||
"DocumentCreate",
|
||||
"DocumentResponse",
|
||||
"VersionCreate",
|
||||
"VersionUpdate",
|
||||
"VersionResponse",
|
||||
"ApprovalHistoryEntry",
|
||||
"ActionRequest",
|
||||
"UserConsentCreate",
|
||||
"CookieCategoryCreate",
|
||||
"CookieCategoryUpdate",
|
||||
]
|
||||
@@ -0,0 +1,415 @@
|
||||
# mypy: disable-error-code="arg-type,assignment,union-attr"
|
||||
# SQLAlchemy 1.x Column() descriptors are Column[T] statically, T at runtime.
|
||||
"""
|
||||
Legal document consent service — user consents, audit log, cookie categories,
|
||||
and consent statistics.
|
||||
|
||||
Phase 1 Step 4: extracted from ``compliance.api.legal_document_routes``.
|
||||
Document/version/approval workflow lives in
|
||||
``compliance.services.legal_document_service.LegalDocumentService``.
|
||||
"""
|
||||
|
||||
import uuid as uuid_mod
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Optional
|
||||
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from compliance.db.legal_document_extend_models import (
|
||||
ConsentAuditLogDB,
|
||||
CookieCategoryDB,
|
||||
UserConsentDB,
|
||||
)
|
||||
from compliance.db.legal_document_models import LegalDocumentDB
|
||||
from compliance.domain import NotFoundError, ValidationError
|
||||
from compliance.schemas.legal_document import (
|
||||
CookieCategoryCreate,
|
||||
CookieCategoryUpdate,
|
||||
UserConsentCreate,
|
||||
)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Serialisation helpers
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def _log_consent_audit(
|
||||
db: Session,
|
||||
tenant_id: Any,
|
||||
action: str,
|
||||
entity_type: str,
|
||||
entity_id: Any = None,
|
||||
user_id: Optional[str] = None,
|
||||
details: Optional[dict[str, Any]] = None,
|
||||
ip_address: Optional[str] = None,
|
||||
) -> ConsentAuditLogDB:
|
||||
entry = ConsentAuditLogDB(
|
||||
tenant_id=tenant_id,
|
||||
action=action,
|
||||
entity_type=entity_type,
|
||||
entity_id=entity_id,
|
||||
user_id=user_id,
|
||||
details=details or {},
|
||||
ip_address=ip_address,
|
||||
)
|
||||
db.add(entry)
|
||||
return entry
|
||||
|
||||
|
||||
def _consent_to_dict(c: UserConsentDB) -> dict[str, Any]:
|
||||
return {
|
||||
"id": str(c.id),
|
||||
"tenant_id": str(c.tenant_id),
|
||||
"user_id": c.user_id,
|
||||
"document_id": str(c.document_id),
|
||||
"document_version_id": str(c.document_version_id) if c.document_version_id else None,
|
||||
"document_type": c.document_type,
|
||||
"consented": c.consented,
|
||||
"ip_address": c.ip_address,
|
||||
"user_agent": c.user_agent,
|
||||
"consented_at": c.consented_at.isoformat() if c.consented_at else None,
|
||||
"withdrawn_at": c.withdrawn_at.isoformat() if c.withdrawn_at else None,
|
||||
"created_at": c.created_at.isoformat() if c.created_at else None,
|
||||
}
|
||||
|
||||
|
||||
def _cookie_cat_to_dict(c: CookieCategoryDB) -> dict[str, Any]:
|
||||
return {
|
||||
"id": str(c.id),
|
||||
"tenant_id": str(c.tenant_id),
|
||||
"name_de": c.name_de,
|
||||
"name_en": c.name_en,
|
||||
"description_de": c.description_de,
|
||||
"description_en": c.description_en,
|
||||
"is_required": c.is_required,
|
||||
"sort_order": c.sort_order,
|
||||
"is_active": c.is_active,
|
||||
"created_at": c.created_at.isoformat() if c.created_at else None,
|
||||
"updated_at": c.updated_at.isoformat() if c.updated_at else None,
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Service
|
||||
# ============================================================================
|
||||
|
||||
|
||||
class LegalDocumentConsentService:
|
||||
"""Business logic for user consents, audit log, and cookie categories."""
|
||||
|
||||
def __init__(self, db: Session) -> None:
|
||||
self.db = db
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# User consents
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def record_consent(
|
||||
self, tenant_id: str, body: UserConsentCreate
|
||||
) -> dict[str, Any]:
|
||||
tid = uuid_mod.UUID(tenant_id)
|
||||
doc_id = uuid_mod.UUID(body.document_id)
|
||||
|
||||
doc = (
|
||||
self.db.query(LegalDocumentDB)
|
||||
.filter(LegalDocumentDB.id == doc_id)
|
||||
.first()
|
||||
)
|
||||
if not doc:
|
||||
raise NotFoundError("Document not found")
|
||||
|
||||
consent = UserConsentDB(
|
||||
tenant_id=tid,
|
||||
user_id=body.user_id,
|
||||
document_id=doc_id,
|
||||
document_version_id=(
|
||||
uuid_mod.UUID(body.document_version_id)
|
||||
if body.document_version_id
|
||||
else None
|
||||
),
|
||||
document_type=body.document_type,
|
||||
consented=body.consented,
|
||||
ip_address=body.ip_address,
|
||||
user_agent=body.user_agent,
|
||||
)
|
||||
self.db.add(consent)
|
||||
self.db.flush()
|
||||
|
||||
_log_consent_audit(
|
||||
self.db,
|
||||
tid,
|
||||
"consent_given",
|
||||
"user_consent",
|
||||
entity_id=consent.id,
|
||||
user_id=body.user_id,
|
||||
details={
|
||||
"document_type": body.document_type,
|
||||
"document_id": body.document_id,
|
||||
},
|
||||
ip_address=body.ip_address,
|
||||
)
|
||||
|
||||
self.db.commit()
|
||||
self.db.refresh(consent)
|
||||
return _consent_to_dict(consent)
|
||||
|
||||
def get_my_consents(
|
||||
self, tenant_id: str, user_id: str
|
||||
) -> list[dict[str, Any]]:
|
||||
tid = uuid_mod.UUID(tenant_id)
|
||||
consents = (
|
||||
self.db.query(UserConsentDB)
|
||||
.filter(
|
||||
UserConsentDB.tenant_id == tid,
|
||||
UserConsentDB.user_id == user_id,
|
||||
UserConsentDB.withdrawn_at.is_(None),
|
||||
)
|
||||
.order_by(UserConsentDB.consented_at.desc())
|
||||
.all()
|
||||
)
|
||||
return [_consent_to_dict(c) for c in consents]
|
||||
|
||||
def check_consent(
|
||||
self, tenant_id: str, document_type: str, user_id: str
|
||||
) -> dict[str, Any]:
|
||||
tid = uuid_mod.UUID(tenant_id)
|
||||
consent = (
|
||||
self.db.query(UserConsentDB)
|
||||
.filter(
|
||||
UserConsentDB.tenant_id == tid,
|
||||
UserConsentDB.user_id == user_id,
|
||||
UserConsentDB.document_type == document_type,
|
||||
UserConsentDB.consented.is_(True),
|
||||
UserConsentDB.withdrawn_at.is_(None),
|
||||
)
|
||||
.order_by(UserConsentDB.consented_at.desc())
|
||||
.first()
|
||||
)
|
||||
return {
|
||||
"has_consent": consent is not None,
|
||||
"consent": _consent_to_dict(consent) if consent else None,
|
||||
}
|
||||
|
||||
def withdraw_consent(
|
||||
self, tenant_id: str, consent_id: str
|
||||
) -> dict[str, Any]:
|
||||
tid = uuid_mod.UUID(tenant_id)
|
||||
try:
|
||||
cid = uuid_mod.UUID(consent_id)
|
||||
except ValueError:
|
||||
raise ValidationError("Invalid consent ID")
|
||||
|
||||
consent = (
|
||||
self.db.query(UserConsentDB)
|
||||
.filter(
|
||||
UserConsentDB.id == cid,
|
||||
UserConsentDB.tenant_id == tid,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
if not consent:
|
||||
raise NotFoundError("Consent not found")
|
||||
if consent.withdrawn_at:
|
||||
raise ValidationError("Consent already withdrawn")
|
||||
|
||||
consent.withdrawn_at = datetime.now(timezone.utc)
|
||||
consent.consented = False
|
||||
|
||||
_log_consent_audit(
|
||||
self.db,
|
||||
tid,
|
||||
"consent_withdrawn",
|
||||
"user_consent",
|
||||
entity_id=cid,
|
||||
user_id=consent.user_id,
|
||||
details={"document_type": consent.document_type},
|
||||
)
|
||||
|
||||
self.db.commit()
|
||||
self.db.refresh(consent)
|
||||
return _consent_to_dict(consent)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Consent statistics
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def get_consent_stats(self, tenant_id: str) -> dict[str, Any]:
|
||||
tid = uuid_mod.UUID(tenant_id)
|
||||
base = self.db.query(UserConsentDB).filter(
|
||||
UserConsentDB.tenant_id == tid
|
||||
)
|
||||
|
||||
total = base.count()
|
||||
active = base.filter(
|
||||
UserConsentDB.consented.is_(True),
|
||||
UserConsentDB.withdrawn_at.is_(None),
|
||||
).count()
|
||||
withdrawn = base.filter(
|
||||
UserConsentDB.withdrawn_at.isnot(None),
|
||||
).count()
|
||||
|
||||
by_type: dict[str, int] = {}
|
||||
type_counts = (
|
||||
self.db.query(UserConsentDB.document_type, func.count(UserConsentDB.id))
|
||||
.filter(UserConsentDB.tenant_id == tid)
|
||||
.group_by(UserConsentDB.document_type)
|
||||
.all()
|
||||
)
|
||||
for dtype, count in type_counts:
|
||||
by_type[dtype] = count
|
||||
|
||||
unique_users = (
|
||||
self.db.query(func.count(func.distinct(UserConsentDB.user_id)))
|
||||
.filter(UserConsentDB.tenant_id == tid)
|
||||
.scalar()
|
||||
) or 0
|
||||
|
||||
return {
|
||||
"total": total,
|
||||
"active": active,
|
||||
"withdrawn": withdrawn,
|
||||
"unique_users": unique_users,
|
||||
"by_type": by_type,
|
||||
}
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Audit log
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def get_audit_log(
|
||||
self,
|
||||
tenant_id: str,
|
||||
limit: int = 50,
|
||||
offset: int = 0,
|
||||
action: Optional[str] = None,
|
||||
entity_type: Optional[str] = None,
|
||||
) -> dict[str, Any]:
|
||||
tid = uuid_mod.UUID(tenant_id)
|
||||
query = self.db.query(ConsentAuditLogDB).filter(
|
||||
ConsentAuditLogDB.tenant_id == tid
|
||||
)
|
||||
if action:
|
||||
query = query.filter(ConsentAuditLogDB.action == action)
|
||||
if entity_type:
|
||||
query = query.filter(ConsentAuditLogDB.entity_type == entity_type)
|
||||
|
||||
total = query.count()
|
||||
entries = (
|
||||
query.order_by(ConsentAuditLogDB.created_at.desc())
|
||||
.offset(offset)
|
||||
.limit(limit)
|
||||
.all()
|
||||
)
|
||||
|
||||
return {
|
||||
"entries": [
|
||||
{
|
||||
"id": str(e.id),
|
||||
"action": e.action,
|
||||
"entity_type": e.entity_type,
|
||||
"entity_id": str(e.entity_id) if e.entity_id else None,
|
||||
"user_id": e.user_id,
|
||||
"details": e.details or {},
|
||||
"ip_address": e.ip_address,
|
||||
"created_at": (
|
||||
e.created_at.isoformat() if e.created_at else None
|
||||
),
|
||||
}
|
||||
for e in entries
|
||||
],
|
||||
"total": total,
|
||||
"limit": limit,
|
||||
"offset": offset,
|
||||
}
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Cookie categories
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def list_cookie_categories(
|
||||
self, tenant_id: str
|
||||
) -> list[dict[str, Any]]:
|
||||
tid = uuid_mod.UUID(tenant_id)
|
||||
cats = (
|
||||
self.db.query(CookieCategoryDB)
|
||||
.filter(CookieCategoryDB.tenant_id == tid)
|
||||
.order_by(CookieCategoryDB.sort_order)
|
||||
.all()
|
||||
)
|
||||
return [_cookie_cat_to_dict(c) for c in cats]
|
||||
|
||||
def create_cookie_category(
|
||||
self, tenant_id: str, body: CookieCategoryCreate
|
||||
) -> dict[str, Any]:
|
||||
tid = uuid_mod.UUID(tenant_id)
|
||||
cat = CookieCategoryDB(
|
||||
tenant_id=tid,
|
||||
name_de=body.name_de,
|
||||
name_en=body.name_en,
|
||||
description_de=body.description_de,
|
||||
description_en=body.description_en,
|
||||
is_required=body.is_required,
|
||||
sort_order=body.sort_order,
|
||||
)
|
||||
self.db.add(cat)
|
||||
self.db.commit()
|
||||
self.db.refresh(cat)
|
||||
return _cookie_cat_to_dict(cat)
|
||||
|
||||
def update_cookie_category(
|
||||
self, tenant_id: str, category_id: str, body: CookieCategoryUpdate
|
||||
) -> dict[str, Any]:
|
||||
tid = uuid_mod.UUID(tenant_id)
|
||||
try:
|
||||
cid = uuid_mod.UUID(category_id)
|
||||
except ValueError:
|
||||
raise ValidationError("Invalid category ID")
|
||||
|
||||
cat = (
|
||||
self.db.query(CookieCategoryDB)
|
||||
.filter(
|
||||
CookieCategoryDB.id == cid,
|
||||
CookieCategoryDB.tenant_id == tid,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
if not cat:
|
||||
raise NotFoundError("Cookie category not found")
|
||||
|
||||
for field in [
|
||||
"name_de", "name_en", "description_de", "description_en",
|
||||
"is_required", "sort_order", "is_active",
|
||||
]:
|
||||
val = getattr(body, field, None)
|
||||
if val is not None:
|
||||
setattr(cat, field, val)
|
||||
|
||||
cat.updated_at = datetime.now(timezone.utc)
|
||||
self.db.commit()
|
||||
self.db.refresh(cat)
|
||||
return _cookie_cat_to_dict(cat)
|
||||
|
||||
def delete_cookie_category(
|
||||
self, tenant_id: str, category_id: str
|
||||
) -> None:
|
||||
tid = uuid_mod.UUID(tenant_id)
|
||||
try:
|
||||
cid = uuid_mod.UUID(category_id)
|
||||
except ValueError:
|
||||
raise ValidationError("Invalid category ID")
|
||||
|
||||
cat = (
|
||||
self.db.query(CookieCategoryDB)
|
||||
.filter(
|
||||
CookieCategoryDB.id == cid,
|
||||
CookieCategoryDB.tenant_id == tid,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
if not cat:
|
||||
raise NotFoundError("Cookie category not found")
|
||||
|
||||
self.db.delete(cat)
|
||||
self.db.commit()
|
||||
395
backend-compliance/compliance/services/legal_document_service.py
Normal file
395
backend-compliance/compliance/services/legal_document_service.py
Normal file
@@ -0,0 +1,395 @@
|
||||
# mypy: disable-error-code="arg-type,assignment,union-attr"
|
||||
"""
|
||||
Legal Document service — documents + versions + approval workflow + public endpoints.
|
||||
|
||||
Phase 1 Step 4: extracted from ``compliance.api.legal_document_routes``.
|
||||
Consents, audit log, and cookie categories live in
|
||||
``compliance.services.legal_document_consent_service``.
|
||||
|
||||
Module-level helpers (_doc_to_response, _version_to_response, _transition,
|
||||
_log_approval) are re-exported from the route module for legacy tests.
|
||||
"""
|
||||
|
||||
import io
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Optional
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from compliance.db.legal_document_models import (
|
||||
LegalDocumentApprovalDB,
|
||||
LegalDocumentDB,
|
||||
LegalDocumentVersionDB,
|
||||
)
|
||||
from compliance.domain import NotFoundError, ValidationError
|
||||
from compliance.schemas.legal_document import (
|
||||
ActionRequest,
|
||||
ApprovalHistoryEntry,
|
||||
DocumentCreate,
|
||||
DocumentResponse,
|
||||
VersionCreate,
|
||||
VersionResponse,
|
||||
VersionUpdate,
|
||||
)
|
||||
|
||||
|
||||
def _doc_to_response(doc: LegalDocumentDB) -> DocumentResponse:
|
||||
return DocumentResponse(
|
||||
id=str(doc.id),
|
||||
tenant_id=doc.tenant_id,
|
||||
type=doc.type,
|
||||
name=doc.name,
|
||||
description=doc.description,
|
||||
mandatory=doc.mandatory or False,
|
||||
created_at=doc.created_at,
|
||||
updated_at=doc.updated_at,
|
||||
)
|
||||
|
||||
|
||||
def _version_to_response(v: LegalDocumentVersionDB) -> VersionResponse:
|
||||
return VersionResponse(
|
||||
id=str(v.id),
|
||||
document_id=str(v.document_id),
|
||||
version=v.version,
|
||||
language=v.language or "de",
|
||||
title=v.title,
|
||||
content=v.content,
|
||||
summary=v.summary,
|
||||
status=v.status or "draft",
|
||||
created_by=v.created_by,
|
||||
approved_by=v.approved_by,
|
||||
approved_at=v.approved_at,
|
||||
rejection_reason=v.rejection_reason,
|
||||
created_at=v.created_at,
|
||||
updated_at=v.updated_at,
|
||||
)
|
||||
|
||||
|
||||
def _log_approval(
|
||||
db: Session,
|
||||
version_id: Any,
|
||||
action: str,
|
||||
approver: Optional[str] = None,
|
||||
comment: Optional[str] = None,
|
||||
) -> LegalDocumentApprovalDB:
|
||||
entry = LegalDocumentApprovalDB(
|
||||
version_id=version_id,
|
||||
action=action,
|
||||
approver=approver,
|
||||
comment=comment,
|
||||
)
|
||||
db.add(entry)
|
||||
return entry
|
||||
|
||||
|
||||
def _transition(
|
||||
db: Session,
|
||||
version_id: str,
|
||||
from_statuses: list[str],
|
||||
to_status: str,
|
||||
action: str,
|
||||
approver: Optional[str],
|
||||
comment: Optional[str],
|
||||
extra_updates: Optional[dict[str, Any]] = None,
|
||||
) -> VersionResponse:
|
||||
version = (
|
||||
db.query(LegalDocumentVersionDB)
|
||||
.filter(LegalDocumentVersionDB.id == version_id)
|
||||
.first()
|
||||
)
|
||||
if not version:
|
||||
raise NotFoundError(f"Version {version_id} not found")
|
||||
if version.status not in from_statuses:
|
||||
raise ValidationError(
|
||||
f"Cannot perform '{action}' on version with status "
|
||||
f"'{version.status}' (expected: {from_statuses})"
|
||||
)
|
||||
|
||||
version.status = to_status
|
||||
version.updated_at = datetime.now(timezone.utc)
|
||||
if extra_updates:
|
||||
for k, val in extra_updates.items():
|
||||
setattr(version, k, val)
|
||||
|
||||
_log_approval(db, version.id, action=action, approver=approver, comment=comment)
|
||||
db.commit()
|
||||
db.refresh(version)
|
||||
return _version_to_response(version)
|
||||
|
||||
|
||||
class LegalDocumentService:
|
||||
"""Business logic for legal documents, versions, and approval workflow."""
|
||||
|
||||
def __init__(self, db: Session) -> None:
|
||||
self.db = db
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Documents
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def list_documents(
|
||||
self, tenant_id: Optional[str], type_filter: Optional[str]
|
||||
) -> dict[str, Any]:
|
||||
q = self.db.query(LegalDocumentDB)
|
||||
if tenant_id:
|
||||
q = q.filter(LegalDocumentDB.tenant_id == tenant_id)
|
||||
if type_filter:
|
||||
q = q.filter(LegalDocumentDB.type == type_filter)
|
||||
docs = q.order_by(LegalDocumentDB.created_at.desc()).all()
|
||||
return {"documents": [_doc_to_response(d).dict() for d in docs]}
|
||||
|
||||
def create_document(self, request: DocumentCreate) -> DocumentResponse:
|
||||
doc = LegalDocumentDB(
|
||||
tenant_id=request.tenant_id,
|
||||
type=request.type,
|
||||
name=request.name,
|
||||
description=request.description,
|
||||
mandatory=request.mandatory,
|
||||
)
|
||||
self.db.add(doc)
|
||||
self.db.commit()
|
||||
self.db.refresh(doc)
|
||||
return _doc_to_response(doc)
|
||||
|
||||
def _doc_or_raise(self, document_id: str) -> LegalDocumentDB:
|
||||
doc = (
|
||||
self.db.query(LegalDocumentDB)
|
||||
.filter(LegalDocumentDB.id == document_id)
|
||||
.first()
|
||||
)
|
||||
if not doc:
|
||||
raise NotFoundError(f"Document {document_id} not found")
|
||||
return doc
|
||||
|
||||
def get_document(self, document_id: str) -> DocumentResponse:
|
||||
return _doc_to_response(self._doc_or_raise(document_id))
|
||||
|
||||
def delete_document(self, document_id: str) -> None:
|
||||
doc = self._doc_or_raise(document_id)
|
||||
self.db.delete(doc)
|
||||
self.db.commit()
|
||||
|
||||
def list_versions_for(self, document_id: str) -> list[VersionResponse]:
|
||||
self._doc_or_raise(document_id)
|
||||
versions = (
|
||||
self.db.query(LegalDocumentVersionDB)
|
||||
.filter(LegalDocumentVersionDB.document_id == document_id)
|
||||
.order_by(LegalDocumentVersionDB.created_at.desc())
|
||||
.all()
|
||||
)
|
||||
return [_version_to_response(v) for v in versions]
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Versions
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def create_version(self, request: VersionCreate) -> VersionResponse:
|
||||
doc = (
|
||||
self.db.query(LegalDocumentDB)
|
||||
.filter(LegalDocumentDB.id == request.document_id)
|
||||
.first()
|
||||
)
|
||||
if not doc:
|
||||
raise NotFoundError(f"Document {request.document_id} not found")
|
||||
|
||||
version = LegalDocumentVersionDB(
|
||||
document_id=request.document_id,
|
||||
version=request.version,
|
||||
language=request.language,
|
||||
title=request.title,
|
||||
content=request.content,
|
||||
summary=request.summary,
|
||||
created_by=request.created_by,
|
||||
status="draft",
|
||||
)
|
||||
self.db.add(version)
|
||||
self.db.flush()
|
||||
|
||||
_log_approval(self.db, version.id, action="created", approver=request.created_by)
|
||||
self.db.commit()
|
||||
self.db.refresh(version)
|
||||
return _version_to_response(version)
|
||||
|
||||
def update_version(
|
||||
self, version_id: str, request: VersionUpdate
|
||||
) -> VersionResponse:
|
||||
version = (
|
||||
self.db.query(LegalDocumentVersionDB)
|
||||
.filter(LegalDocumentVersionDB.id == version_id)
|
||||
.first()
|
||||
)
|
||||
if not version:
|
||||
raise NotFoundError(f"Version {version_id} not found")
|
||||
if version.status not in ("draft", "rejected"):
|
||||
raise ValidationError(
|
||||
f"Only draft/rejected versions can be edited (current: {version.status})"
|
||||
)
|
||||
|
||||
for field, value in request.dict(exclude_none=True).items():
|
||||
setattr(version, field, value)
|
||||
version.updated_at = datetime.now(timezone.utc)
|
||||
self.db.commit()
|
||||
self.db.refresh(version)
|
||||
return _version_to_response(version)
|
||||
|
||||
def get_version(self, version_id: str) -> VersionResponse:
|
||||
v = (
|
||||
self.db.query(LegalDocumentVersionDB)
|
||||
.filter(LegalDocumentVersionDB.id == version_id)
|
||||
.first()
|
||||
)
|
||||
if not v:
|
||||
raise NotFoundError(f"Version {version_id} not found")
|
||||
return _version_to_response(v)
|
||||
|
||||
async def upload_word(self, filename: Optional[str], content_bytes: bytes) -> dict[str, Any]:
|
||||
if not filename or not filename.lower().endswith(".docx"):
|
||||
raise ValidationError("Only .docx files are supported")
|
||||
|
||||
html_content = ""
|
||||
try:
|
||||
import mammoth # type: ignore
|
||||
result = mammoth.convert_to_html(io.BytesIO(content_bytes))
|
||||
html_content = result.value
|
||||
except ImportError:
|
||||
html_content = (
|
||||
f"<p>[DOCX-Import: {filename}]</p>"
|
||||
f"<p>Bitte installieren Sie 'mammoth' fuer DOCX-Konvertierung.</p>"
|
||||
)
|
||||
return {"html": html_content, "filename": filename}
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Workflow transitions
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def submit_review(self, version_id: str, request: ActionRequest) -> VersionResponse:
|
||||
return _transition(
|
||||
self.db, version_id, ["draft", "rejected"], "review", "submitted",
|
||||
request.approver, request.comment,
|
||||
)
|
||||
|
||||
def approve(self, version_id: str, request: ActionRequest) -> VersionResponse:
|
||||
return _transition(
|
||||
self.db, version_id, ["review"], "approved", "approved",
|
||||
request.approver, request.comment,
|
||||
extra_updates={
|
||||
"approved_by": request.approver,
|
||||
"approved_at": datetime.now(timezone.utc),
|
||||
},
|
||||
)
|
||||
|
||||
def reject(self, version_id: str, request: ActionRequest) -> VersionResponse:
|
||||
return _transition(
|
||||
self.db, version_id, ["review"], "rejected", "rejected",
|
||||
request.approver, request.comment,
|
||||
extra_updates={"rejection_reason": request.comment},
|
||||
)
|
||||
|
||||
def publish(self, version_id: str, request: ActionRequest) -> VersionResponse:
|
||||
return _transition(
|
||||
self.db, version_id, ["approved"], "published", "published",
|
||||
request.approver, request.comment,
|
||||
)
|
||||
|
||||
def approval_history(self, version_id: str) -> list[ApprovalHistoryEntry]:
|
||||
version = (
|
||||
self.db.query(LegalDocumentVersionDB)
|
||||
.filter(LegalDocumentVersionDB.id == version_id)
|
||||
.first()
|
||||
)
|
||||
if not version:
|
||||
raise NotFoundError(f"Version {version_id} not found")
|
||||
entries = (
|
||||
self.db.query(LegalDocumentApprovalDB)
|
||||
.filter(LegalDocumentApprovalDB.version_id == version_id)
|
||||
.order_by(LegalDocumentApprovalDB.created_at.asc())
|
||||
.all()
|
||||
)
|
||||
return [
|
||||
ApprovalHistoryEntry(
|
||||
id=str(e.id),
|
||||
version_id=str(e.version_id),
|
||||
action=e.action,
|
||||
approver=e.approver,
|
||||
comment=e.comment,
|
||||
created_at=e.created_at,
|
||||
)
|
||||
for e in entries
|
||||
]
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Public endpoints (end-user facing)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def list_public(self, tenant_id: str) -> list[dict[str, Any]]:
|
||||
docs = (
|
||||
self.db.query(LegalDocumentDB)
|
||||
.filter(LegalDocumentDB.tenant_id == tenant_id)
|
||||
.order_by(LegalDocumentDB.created_at.desc())
|
||||
.all()
|
||||
)
|
||||
result: list[dict[str, Any]] = []
|
||||
for doc in docs:
|
||||
published = (
|
||||
self.db.query(LegalDocumentVersionDB)
|
||||
.filter(
|
||||
LegalDocumentVersionDB.document_id == doc.id,
|
||||
LegalDocumentVersionDB.status == "published",
|
||||
)
|
||||
.order_by(LegalDocumentVersionDB.created_at.desc())
|
||||
.first()
|
||||
)
|
||||
if published:
|
||||
result.append({
|
||||
"id": str(doc.id),
|
||||
"type": doc.type,
|
||||
"name": doc.name,
|
||||
"version": published.version,
|
||||
"title": published.title,
|
||||
"content": published.content,
|
||||
"language": published.language,
|
||||
"published_at": (
|
||||
published.approved_at.isoformat() if published.approved_at else None
|
||||
),
|
||||
})
|
||||
return result
|
||||
|
||||
def get_latest_published(
|
||||
self, tenant_id: str, document_type: str, language: str
|
||||
) -> dict[str, Any]:
|
||||
doc = (
|
||||
self.db.query(LegalDocumentDB)
|
||||
.filter(
|
||||
LegalDocumentDB.tenant_id == tenant_id,
|
||||
LegalDocumentDB.type == document_type,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
if not doc:
|
||||
raise NotFoundError(f"No document of type '{document_type}' found")
|
||||
|
||||
version = (
|
||||
self.db.query(LegalDocumentVersionDB)
|
||||
.filter(
|
||||
LegalDocumentVersionDB.document_id == doc.id,
|
||||
LegalDocumentVersionDB.status == "published",
|
||||
LegalDocumentVersionDB.language == language,
|
||||
)
|
||||
.order_by(LegalDocumentVersionDB.created_at.desc())
|
||||
.first()
|
||||
)
|
||||
if not version:
|
||||
raise NotFoundError(
|
||||
f"No published version for type '{document_type}' in language '{language}'"
|
||||
)
|
||||
|
||||
return {
|
||||
"document_id": str(doc.id),
|
||||
"type": doc.type,
|
||||
"name": doc.name,
|
||||
"version_id": str(version.id),
|
||||
"version": version.version,
|
||||
"title": version.title,
|
||||
"content": version.content,
|
||||
"language": version.language,
|
||||
}
|
||||
@@ -93,5 +93,7 @@ ignore_errors = False
|
||||
ignore_errors = False
|
||||
[mypy-compliance.api.incident_routes]
|
||||
ignore_errors = False
|
||||
[mypy-compliance.api.legal_document_routes]
|
||||
ignore_errors = False
|
||||
[mypy-compliance.api._http_errors]
|
||||
ignore_errors = False
|
||||
|
||||
@@ -199,33 +199,30 @@ class TestVersionToResponse:
|
||||
|
||||
class TestApprovalWorkflow:
|
||||
def test_transition_raises_on_wrong_status(self):
|
||||
"""_transition should raise HTTPException if version is in wrong status."""
|
||||
"""_transition should raise ValidationError if version is in wrong status."""
|
||||
from compliance.api.legal_document_routes import _transition
|
||||
from fastapi import HTTPException
|
||||
from compliance.domain import ValidationError as DomainValidationError
|
||||
|
||||
mock_db = MagicMock()
|
||||
v = make_version(status='draft')
|
||||
mock_db.query.return_value.filter.return_value.first.return_value = v
|
||||
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
with pytest.raises(DomainValidationError) as exc_info:
|
||||
_transition(mock_db, str(v.id), ['review'], 'approved', 'approved', None, None)
|
||||
|
||||
assert exc_info.value.status_code == 400
|
||||
assert 'draft' in exc_info.value.detail
|
||||
assert 'draft' in str(exc_info.value)
|
||||
|
||||
def test_transition_raises_on_not_found(self):
|
||||
"""_transition should raise 404 if version not found."""
|
||||
"""_transition should raise NotFoundError if version not found."""
|
||||
from compliance.api.legal_document_routes import _transition
|
||||
from fastapi import HTTPException
|
||||
from compliance.domain import NotFoundError
|
||||
|
||||
mock_db = MagicMock()
|
||||
mock_db.query.return_value.filter.return_value.first.return_value = None
|
||||
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
with pytest.raises(NotFoundError):
|
||||
_transition(mock_db, make_uuid(), ['draft'], 'review', 'submitted', None, None)
|
||||
|
||||
assert exc_info.value.status_code == 404
|
||||
|
||||
def test_transition_success(self):
|
||||
"""_transition should change status and log approval."""
|
||||
from compliance.api.legal_document_routes import _transition
|
||||
|
||||
Reference in New Issue
Block a user