feat: Add Document Crawler & Auto-Onboarding service (Phase 1.4)
New standalone Python/FastAPI service for automatic compliance document scanning, LLM-based classification, IPFS archival, and gap analysis. Includes extractors (PDF, DOCX, XLSX, PPTX), keyword fallback classifier, compliance matrix, and full REST API on port 8098. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
249
document-crawler/api/jobs.py
Normal file
249
document-crawler/api/jobs.py
Normal file
@@ -0,0 +1,249 @@
|
||||
"""Crawl job management + trigger endpoints."""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
from fastapi import APIRouter, HTTPException, Header, BackgroundTasks
|
||||
from pydantic import BaseModel
|
||||
|
||||
from db import get_pool
|
||||
from config import settings
|
||||
from crawlers.filesystem_crawler import FilesystemCrawler
|
||||
from extractors.dispatcher import extract_text
|
||||
from classifiers.llm_classifier import classify_document
|
||||
|
||||
router = APIRouter(tags=["jobs"])
|
||||
|
||||
|
||||
class JobCreate(BaseModel):
|
||||
source_id: str
|
||||
job_type: str = "full" # full or delta
|
||||
|
||||
|
||||
async def _run_crawl_job(job_id: str, source_id: str, tenant_id: str, job_type: str):
|
||||
"""Background task that executes a crawl job."""
|
||||
pool = await get_pool()
|
||||
|
||||
async with pool.acquire() as conn:
|
||||
source = await conn.fetchrow(
|
||||
"SELECT * FROM crawler_sources WHERE id = $1", uuid.UUID(source_id)
|
||||
)
|
||||
if not source:
|
||||
await conn.execute(
|
||||
"UPDATE crawler_jobs SET status = 'failed', error_message = 'Source not found', completed_at = NOW() WHERE id = $1",
|
||||
uuid.UUID(job_id),
|
||||
)
|
||||
return
|
||||
|
||||
# Mark job as running
|
||||
await conn.execute(
|
||||
"UPDATE crawler_jobs SET status = 'running', started_at = NOW() WHERE id = $1",
|
||||
uuid.UUID(job_id),
|
||||
)
|
||||
|
||||
# Resolve path
|
||||
source_path = source["path"]
|
||||
if not os.path.isabs(source_path):
|
||||
source_path = os.path.join(settings.CRAWL_BASE_PATH, source_path)
|
||||
|
||||
exts = json.loads(source["file_extensions"]) if isinstance(source["file_extensions"], str) else source["file_extensions"]
|
||||
excludes = json.loads(source["exclude_patterns"]) if isinstance(source["exclude_patterns"], str) else source["exclude_patterns"]
|
||||
|
||||
crawler = FilesystemCrawler(
|
||||
base_path=source_path,
|
||||
file_extensions=exts,
|
||||
max_depth=source["max_depth"],
|
||||
exclude_patterns=excludes,
|
||||
)
|
||||
|
||||
files = crawler.crawl()
|
||||
|
||||
stats = {
|
||||
"files_found": len(files),
|
||||
"files_processed": 0,
|
||||
"files_new": 0,
|
||||
"files_changed": 0,
|
||||
"files_skipped": 0,
|
||||
"files_error": 0,
|
||||
}
|
||||
|
||||
for crawled in files:
|
||||
try:
|
||||
async with pool.acquire() as conn:
|
||||
# Check for existing document (delta detection)
|
||||
existing = await conn.fetchrow(
|
||||
"SELECT id, file_hash FROM crawler_documents WHERE tenant_id = $1 AND source_id = $2 AND file_path = $3",
|
||||
uuid.UUID(tenant_id), uuid.UUID(source_id), crawled.file_path,
|
||||
)
|
||||
|
||||
if existing:
|
||||
if job_type == "delta" and existing["file_hash"] == crawled.file_hash:
|
||||
# Unchanged — skip
|
||||
await conn.execute(
|
||||
"UPDATE crawler_documents SET last_seen_at = NOW() WHERE id = $1",
|
||||
existing["id"],
|
||||
)
|
||||
stats["files_skipped"] += 1
|
||||
stats["files_processed"] += 1
|
||||
continue
|
||||
elif existing["file_hash"] != crawled.file_hash:
|
||||
stats["files_changed"] += 1
|
||||
else:
|
||||
stats["files_skipped"] += 1
|
||||
stats["files_processed"] += 1
|
||||
continue
|
||||
else:
|
||||
stats["files_new"] += 1
|
||||
|
||||
# Extract text
|
||||
extraction_status = "completed"
|
||||
extracted_text = ""
|
||||
try:
|
||||
extracted_text = extract_text(crawled.file_path, crawled.file_extension)
|
||||
except Exception:
|
||||
extraction_status = "failed"
|
||||
|
||||
# Classify
|
||||
classification_result = {"classification": None, "confidence": None, "reasoning": None}
|
||||
if extracted_text:
|
||||
classification_result = await classify_document(
|
||||
extracted_text, crawled.file_name, tenant_id
|
||||
)
|
||||
|
||||
if existing:
|
||||
# Update existing
|
||||
await conn.execute(
|
||||
"""UPDATE crawler_documents SET
|
||||
job_id = $1, file_size_bytes = $2, file_hash = $3,
|
||||
extracted_text = $4, extraction_status = $5,
|
||||
classification = $6, classification_confidence = $7,
|
||||
classification_reasoning = $8, classification_corrected = false,
|
||||
last_seen_at = NOW(), version_count = version_count + 1,
|
||||
updated_at = NOW()
|
||||
WHERE id = $9""",
|
||||
uuid.UUID(job_id), crawled.file_size_bytes, crawled.file_hash,
|
||||
extracted_text, extraction_status,
|
||||
classification_result["classification"],
|
||||
classification_result["confidence"],
|
||||
classification_result["reasoning"],
|
||||
existing["id"],
|
||||
)
|
||||
else:
|
||||
# Insert new
|
||||
await conn.execute(
|
||||
"""INSERT INTO crawler_documents
|
||||
(tenant_id, source_id, job_id, file_path, file_name, file_extension,
|
||||
file_size_bytes, file_hash, extracted_text, extraction_status,
|
||||
classification, classification_confidence, classification_reasoning)
|
||||
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13)""",
|
||||
uuid.UUID(tenant_id), uuid.UUID(source_id), uuid.UUID(job_id),
|
||||
crawled.file_path, crawled.file_name, crawled.file_extension,
|
||||
crawled.file_size_bytes, crawled.file_hash,
|
||||
extracted_text, extraction_status,
|
||||
classification_result["classification"],
|
||||
classification_result["confidence"],
|
||||
classification_result["reasoning"],
|
||||
)
|
||||
|
||||
stats["files_processed"] += 1
|
||||
|
||||
except Exception:
|
||||
stats["files_error"] += 1
|
||||
stats["files_processed"] += 1
|
||||
|
||||
# Update job progress
|
||||
async with pool.acquire() as conn:
|
||||
await conn.execute(
|
||||
"""UPDATE crawler_jobs SET
|
||||
files_found=$2, files_processed=$3, files_new=$4,
|
||||
files_changed=$5, files_skipped=$6, files_error=$7
|
||||
WHERE id = $1""",
|
||||
uuid.UUID(job_id),
|
||||
stats["files_found"], stats["files_processed"],
|
||||
stats["files_new"], stats["files_changed"],
|
||||
stats["files_skipped"], stats["files_error"],
|
||||
)
|
||||
|
||||
# Mark completed
|
||||
async with pool.acquire() as conn:
|
||||
await conn.execute(
|
||||
"UPDATE crawler_jobs SET status = 'completed', completed_at = NOW() WHERE id = $1",
|
||||
uuid.UUID(job_id),
|
||||
)
|
||||
|
||||
|
||||
@router.post("/jobs", status_code=201)
|
||||
async def create_job(
|
||||
body: JobCreate,
|
||||
background_tasks: BackgroundTasks,
|
||||
x_tenant_id: str = Header(...),
|
||||
):
|
||||
pool = await get_pool()
|
||||
async with pool.acquire() as conn:
|
||||
# Verify source exists
|
||||
source = await conn.fetchrow(
|
||||
"SELECT id FROM crawler_sources WHERE id = $1 AND tenant_id = $2",
|
||||
uuid.UUID(body.source_id), uuid.UUID(x_tenant_id),
|
||||
)
|
||||
if not source:
|
||||
raise HTTPException(404, "Source not found")
|
||||
|
||||
# Check no job already running for this source
|
||||
running = await conn.fetchval(
|
||||
"SELECT EXISTS(SELECT 1 FROM crawler_jobs WHERE source_id = $1 AND status = 'running')",
|
||||
uuid.UUID(body.source_id),
|
||||
)
|
||||
if running:
|
||||
raise HTTPException(409, "A job is already running for this source")
|
||||
|
||||
row = await conn.fetchrow(
|
||||
"""INSERT INTO crawler_jobs (tenant_id, source_id, job_type)
|
||||
VALUES ($1, $2, $3) RETURNING *""",
|
||||
uuid.UUID(x_tenant_id), uuid.UUID(body.source_id), body.job_type,
|
||||
)
|
||||
|
||||
job_id = str(row["id"])
|
||||
background_tasks.add_task(_run_crawl_job, job_id, body.source_id, x_tenant_id, body.job_type)
|
||||
|
||||
return dict(row)
|
||||
|
||||
|
||||
@router.get("/jobs")
|
||||
async def list_jobs(x_tenant_id: str = Header(...)):
|
||||
pool = await get_pool()
|
||||
async with pool.acquire() as conn:
|
||||
rows = await conn.fetch(
|
||||
"""SELECT j.*, s.name as source_name
|
||||
FROM crawler_jobs j JOIN crawler_sources s ON j.source_id = s.id
|
||||
WHERE j.tenant_id = $1 ORDER BY j.created_at DESC LIMIT 50""",
|
||||
uuid.UUID(x_tenant_id),
|
||||
)
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
|
||||
@router.get("/jobs/{job_id}")
|
||||
async def get_job(job_id: str, x_tenant_id: str = Header(...)):
|
||||
pool = await get_pool()
|
||||
async with pool.acquire() as conn:
|
||||
row = await conn.fetchrow(
|
||||
"SELECT * FROM crawler_jobs WHERE id = $1 AND tenant_id = $2",
|
||||
uuid.UUID(job_id), uuid.UUID(x_tenant_id),
|
||||
)
|
||||
if not row:
|
||||
raise HTTPException(404, "Job not found")
|
||||
return dict(row)
|
||||
|
||||
|
||||
@router.post("/jobs/{job_id}/cancel")
|
||||
async def cancel_job(job_id: str, x_tenant_id: str = Header(...)):
|
||||
pool = await get_pool()
|
||||
async with pool.acquire() as conn:
|
||||
result = await conn.execute(
|
||||
"UPDATE crawler_jobs SET status = 'cancelled', completed_at = NOW() WHERE id = $1 AND tenant_id = $2 AND status IN ('pending', 'running')",
|
||||
uuid.UUID(job_id), uuid.UUID(x_tenant_id),
|
||||
)
|
||||
if result == "UPDATE 0":
|
||||
raise HTTPException(404, "Job not found or not cancellable")
|
||||
return {"status": "cancelled"}
|
||||
Reference in New Issue
Block a user