Files
Benjamin Boenisch 364d2c69ff feat: Add Document Crawler & Auto-Onboarding service (Phase 1.4)
New standalone Python/FastAPI service for automatic compliance document
scanning, LLM-based classification, IPFS archival, and gap analysis.
Includes extractors (PDF, DOCX, XLSX, PPTX), keyword fallback classifier,
compliance matrix, and full REST API on port 8098.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 20:35:15 +01:00

153 lines
5.1 KiB
Python

"""Document list, reclassify, archive endpoints."""
import uuid
from datetime import datetime, timezone
from fastapi import APIRouter, HTTPException, Header, Query
from pydantic import BaseModel
from db import get_pool
from archiver.dsms_client import archive_document
router = APIRouter(tags=["documents"])
class ClassifyUpdate(BaseModel):
classification: str
class ArchiveBatch(BaseModel):
document_ids: list[str]
@router.get("/documents")
async def list_documents(
x_tenant_id: str = Header(...),
classification: str | None = Query(None),
extraction_status: str | None = Query(None),
archived: bool | None = Query(None),
limit: int = Query(100, le=500),
offset: int = Query(0),
):
pool = await get_pool()
conditions = ["d.tenant_id = $1"]
params: list = [uuid.UUID(x_tenant_id)]
idx = 2
if classification:
conditions.append(f"d.classification = ${idx}")
params.append(classification)
idx += 1
if extraction_status:
conditions.append(f"d.extraction_status = ${idx}")
params.append(extraction_status)
idx += 1
if archived is not None:
conditions.append(f"d.archived = ${idx}")
params.append(archived)
idx += 1
where = " AND ".join(conditions)
async with pool.acquire() as conn:
total = await conn.fetchval(
f"SELECT COUNT(*) FROM crawler_documents d WHERE {where}", *params
)
rows = await conn.fetch(
f"""SELECT d.id, d.file_name, d.file_extension, d.file_size_bytes,
d.classification, d.classification_confidence,
d.classification_corrected, d.extraction_status,
d.archived, d.ipfs_cid, d.first_seen_at, d.last_seen_at,
d.version_count, s.name as source_name
FROM crawler_documents d
JOIN crawler_sources s ON d.source_id = s.id
WHERE {where}
ORDER BY d.created_at DESC
LIMIT ${idx} OFFSET ${idx+1}""",
*params, limit, offset,
)
return {"total": total, "documents": [dict(r) for r in rows]}
@router.get("/documents/{doc_id}")
async def get_document(doc_id: str, x_tenant_id: str = Header(...)):
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""SELECT d.*, s.name as source_name
FROM crawler_documents d
JOIN crawler_sources s ON d.source_id = s.id
WHERE d.id = $1 AND d.tenant_id = $2""",
uuid.UUID(doc_id), uuid.UUID(x_tenant_id),
)
if not row:
raise HTTPException(404, "Document not found")
result = dict(row)
# Include a text preview (first 500 chars)
if result.get("extracted_text"):
result["text_preview"] = result["extracted_text"][:500]
return result
@router.put("/documents/{doc_id}/classify")
async def classify_document_manually(
doc_id: str, body: ClassifyUpdate, x_tenant_id: str = Header(...)
):
pool = await get_pool()
async with pool.acquire() as conn:
result = await conn.execute(
"""UPDATE crawler_documents SET
classification = $3, classification_corrected = true, updated_at = NOW()
WHERE id = $1 AND tenant_id = $2""",
uuid.UUID(doc_id), uuid.UUID(x_tenant_id), body.classification,
)
if result == "UPDATE 0":
raise HTTPException(404, "Document not found")
return {"status": "updated", "classification": body.classification, "corrected": True}
@router.post("/documents/{doc_id}/archive")
async def archive_single_document(doc_id: str, x_tenant_id: str = Header(...)):
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM crawler_documents WHERE id = $1 AND tenant_id = $2",
uuid.UUID(doc_id), uuid.UUID(x_tenant_id),
)
if not row:
raise HTTPException(404, "Document not found")
if row["archived"]:
return {"status": "already_archived", "ipfs_cid": row["ipfs_cid"]}
try:
result = await archive_document(
file_path=row["file_path"],
file_name=row["file_name"],
document_type=row["classification"] or "unknown",
document_id=str(row["id"]),
)
cid = result.get("cid")
except Exception as e:
raise HTTPException(502, f"Archival failed: {e}")
async with pool.acquire() as conn:
await conn.execute(
"UPDATE crawler_documents SET archived = true, ipfs_cid = $2, archived_at = NOW(), updated_at = NOW() WHERE id = $1",
uuid.UUID(doc_id), cid,
)
return {"status": "archived", "ipfs_cid": cid}
@router.post("/documents/archive-batch")
async def archive_batch(body: ArchiveBatch, x_tenant_id: str = Header(...)):
results = []
for did in body.document_ids:
try:
r = await archive_single_document(did, x_tenant_id)
results.append({"id": did, **r})
except HTTPException as e:
results.append({"id": did, "status": "error", "error": e.detail})
return {"results": results}