diff --git a/docker-compose.yml b/docker-compose.yml index 75a1ae7..73f224f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -206,6 +206,39 @@ services: networks: - breakpilot-network + # ========================================================= + # DOCUMENT CRAWLER & AUTO-ONBOARDING + # ========================================================= + document-crawler: + build: + context: ./document-crawler + dockerfile: Dockerfile + container_name: bp-compliance-document-crawler + platform: linux/arm64 + ports: + - "8098:8098" + environment: + PORT: 8098 + DATABASE_URL: postgresql://${POSTGRES_USER:-breakpilot}:${POSTGRES_PASSWORD:-breakpilot123}@bp-core-postgres:5432/${POSTGRES_DB:-breakpilot_db} + LLM_GATEWAY_URL: http://ai-compliance-sdk:8090 + DSMS_GATEWAY_URL: http://dsms-gateway:8082 + CRAWL_BASE_PATH: /data/crawl + MAX_FILE_SIZE_MB: 50 + volumes: + - /tmp/breakpilot-crawl-data:/data/crawl:ro + depends_on: + core-health-check: + condition: service_completed_successfully + healthcheck: + test: ["CMD", "curl", "-f", "http://127.0.0.1:8098/health"] + interval: 30s + timeout: 10s + start_period: 15s + retries: 3 + restart: unless-stopped + networks: + - breakpilot-network + # ========================================================= # DOCUMENTATION # ========================================================= diff --git a/document-crawler/Dockerfile b/document-crawler/Dockerfile new file mode 100644 index 0000000..7a25e57 --- /dev/null +++ b/document-crawler/Dockerfile @@ -0,0 +1,37 @@ +# Document Crawler - Auto-Onboarding Service +FROM python:3.11-slim + +LABEL maintainer="BreakPilot " +LABEL description="Document Crawler & Auto-Onboarding Service" + +WORKDIR /app + +# Install curl for healthcheck and system dependencies for document extraction +RUN apt-get update && apt-get install -y --no-install-recommends \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Install Python dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application +COPY . . + +# Environment variables +ENV PORT=8098 +ENV DATABASE_URL=postgresql://breakpilot:breakpilot123@bp-core-postgres:5432/breakpilot_db +ENV LLM_GATEWAY_URL=http://ai-compliance-sdk:8090 +ENV DSMS_GATEWAY_URL=http://dsms-gateway:8082 +ENV CRAWL_BASE_PATH=/data/crawl +ENV MAX_FILE_SIZE_MB=50 + +# Expose port +EXPOSE 8098 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=15s --retries=3 \ + CMD curl -f http://localhost:8098/health || exit 1 + +# Run application +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8098"] diff --git a/document-crawler/api/__init__.py b/document-crawler/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/document-crawler/api/documents.py b/document-crawler/api/documents.py new file mode 100644 index 0000000..bab4b7a --- /dev/null +++ b/document-crawler/api/documents.py @@ -0,0 +1,152 @@ +"""Document list, reclassify, archive endpoints.""" + +import uuid +from datetime import datetime, timezone +from fastapi import APIRouter, HTTPException, Header, Query +from pydantic import BaseModel + +from db import get_pool +from archiver.dsms_client import archive_document + +router = APIRouter(tags=["documents"]) + + +class ClassifyUpdate(BaseModel): + classification: str + + +class ArchiveBatch(BaseModel): + document_ids: list[str] + + +@router.get("/documents") +async def list_documents( + x_tenant_id: str = Header(...), + classification: str | None = Query(None), + extraction_status: str | None = Query(None), + archived: bool | None = Query(None), + limit: int = Query(100, le=500), + offset: int = Query(0), +): + pool = await get_pool() + conditions = ["d.tenant_id = $1"] + params: list = [uuid.UUID(x_tenant_id)] + idx = 2 + + if classification: + conditions.append(f"d.classification = ${idx}") + params.append(classification) + idx += 1 + if extraction_status: + conditions.append(f"d.extraction_status = ${idx}") + params.append(extraction_status) + idx += 1 + if archived is not None: + conditions.append(f"d.archived = ${idx}") + params.append(archived) + idx += 1 + + where = " AND ".join(conditions) + + async with pool.acquire() as conn: + total = await conn.fetchval( + f"SELECT COUNT(*) FROM crawler_documents d WHERE {where}", *params + ) + rows = await conn.fetch( + f"""SELECT d.id, d.file_name, d.file_extension, d.file_size_bytes, + d.classification, d.classification_confidence, + d.classification_corrected, d.extraction_status, + d.archived, d.ipfs_cid, d.first_seen_at, d.last_seen_at, + d.version_count, s.name as source_name + FROM crawler_documents d + JOIN crawler_sources s ON d.source_id = s.id + WHERE {where} + ORDER BY d.created_at DESC + LIMIT ${idx} OFFSET ${idx+1}""", + *params, limit, offset, + ) + + return {"total": total, "documents": [dict(r) for r in rows]} + + +@router.get("/documents/{doc_id}") +async def get_document(doc_id: str, x_tenant_id: str = Header(...)): + pool = await get_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow( + """SELECT d.*, s.name as source_name + FROM crawler_documents d + JOIN crawler_sources s ON d.source_id = s.id + WHERE d.id = $1 AND d.tenant_id = $2""", + uuid.UUID(doc_id), uuid.UUID(x_tenant_id), + ) + if not row: + raise HTTPException(404, "Document not found") + + result = dict(row) + # Include a text preview (first 500 chars) + if result.get("extracted_text"): + result["text_preview"] = result["extracted_text"][:500] + return result + + +@router.put("/documents/{doc_id}/classify") +async def classify_document_manually( + doc_id: str, body: ClassifyUpdate, x_tenant_id: str = Header(...) +): + pool = await get_pool() + async with pool.acquire() as conn: + result = await conn.execute( + """UPDATE crawler_documents SET + classification = $3, classification_corrected = true, updated_at = NOW() + WHERE id = $1 AND tenant_id = $2""", + uuid.UUID(doc_id), uuid.UUID(x_tenant_id), body.classification, + ) + if result == "UPDATE 0": + raise HTTPException(404, "Document not found") + return {"status": "updated", "classification": body.classification, "corrected": True} + + +@router.post("/documents/{doc_id}/archive") +async def archive_single_document(doc_id: str, x_tenant_id: str = Header(...)): + pool = await get_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow( + "SELECT * FROM crawler_documents WHERE id = $1 AND tenant_id = $2", + uuid.UUID(doc_id), uuid.UUID(x_tenant_id), + ) + if not row: + raise HTTPException(404, "Document not found") + if row["archived"]: + return {"status": "already_archived", "ipfs_cid": row["ipfs_cid"]} + + try: + result = await archive_document( + file_path=row["file_path"], + file_name=row["file_name"], + document_type=row["classification"] or "unknown", + document_id=str(row["id"]), + ) + cid = result.get("cid") + except Exception as e: + raise HTTPException(502, f"Archival failed: {e}") + + async with pool.acquire() as conn: + await conn.execute( + "UPDATE crawler_documents SET archived = true, ipfs_cid = $2, archived_at = NOW(), updated_at = NOW() WHERE id = $1", + uuid.UUID(doc_id), cid, + ) + + return {"status": "archived", "ipfs_cid": cid} + + +@router.post("/documents/archive-batch") +async def archive_batch(body: ArchiveBatch, x_tenant_id: str = Header(...)): + results = [] + for did in body.document_ids: + try: + r = await archive_single_document(did, x_tenant_id) + results.append({"id": did, **r}) + except HTTPException as e: + results.append({"id": did, "status": "error", "error": e.detail}) + return {"results": results} diff --git a/document-crawler/api/jobs.py b/document-crawler/api/jobs.py new file mode 100644 index 0000000..9584845 --- /dev/null +++ b/document-crawler/api/jobs.py @@ -0,0 +1,249 @@ +"""Crawl job management + trigger endpoints.""" + +import asyncio +import json +import os +import uuid +from datetime import datetime, timezone +from fastapi import APIRouter, HTTPException, Header, BackgroundTasks +from pydantic import BaseModel + +from db import get_pool +from config import settings +from crawlers.filesystem_crawler import FilesystemCrawler +from extractors.dispatcher import extract_text +from classifiers.llm_classifier import classify_document + +router = APIRouter(tags=["jobs"]) + + +class JobCreate(BaseModel): + source_id: str + job_type: str = "full" # full or delta + + +async def _run_crawl_job(job_id: str, source_id: str, tenant_id: str, job_type: str): + """Background task that executes a crawl job.""" + pool = await get_pool() + + async with pool.acquire() as conn: + source = await conn.fetchrow( + "SELECT * FROM crawler_sources WHERE id = $1", uuid.UUID(source_id) + ) + if not source: + await conn.execute( + "UPDATE crawler_jobs SET status = 'failed', error_message = 'Source not found', completed_at = NOW() WHERE id = $1", + uuid.UUID(job_id), + ) + return + + # Mark job as running + await conn.execute( + "UPDATE crawler_jobs SET status = 'running', started_at = NOW() WHERE id = $1", + uuid.UUID(job_id), + ) + + # Resolve path + source_path = source["path"] + if not os.path.isabs(source_path): + source_path = os.path.join(settings.CRAWL_BASE_PATH, source_path) + + exts = json.loads(source["file_extensions"]) if isinstance(source["file_extensions"], str) else source["file_extensions"] + excludes = json.loads(source["exclude_patterns"]) if isinstance(source["exclude_patterns"], str) else source["exclude_patterns"] + + crawler = FilesystemCrawler( + base_path=source_path, + file_extensions=exts, + max_depth=source["max_depth"], + exclude_patterns=excludes, + ) + + files = crawler.crawl() + + stats = { + "files_found": len(files), + "files_processed": 0, + "files_new": 0, + "files_changed": 0, + "files_skipped": 0, + "files_error": 0, + } + + for crawled in files: + try: + async with pool.acquire() as conn: + # Check for existing document (delta detection) + existing = await conn.fetchrow( + "SELECT id, file_hash FROM crawler_documents WHERE tenant_id = $1 AND source_id = $2 AND file_path = $3", + uuid.UUID(tenant_id), uuid.UUID(source_id), crawled.file_path, + ) + + if existing: + if job_type == "delta" and existing["file_hash"] == crawled.file_hash: + # Unchanged — skip + await conn.execute( + "UPDATE crawler_documents SET last_seen_at = NOW() WHERE id = $1", + existing["id"], + ) + stats["files_skipped"] += 1 + stats["files_processed"] += 1 + continue + elif existing["file_hash"] != crawled.file_hash: + stats["files_changed"] += 1 + else: + stats["files_skipped"] += 1 + stats["files_processed"] += 1 + continue + else: + stats["files_new"] += 1 + + # Extract text + extraction_status = "completed" + extracted_text = "" + try: + extracted_text = extract_text(crawled.file_path, crawled.file_extension) + except Exception: + extraction_status = "failed" + + # Classify + classification_result = {"classification": None, "confidence": None, "reasoning": None} + if extracted_text: + classification_result = await classify_document( + extracted_text, crawled.file_name, tenant_id + ) + + if existing: + # Update existing + await conn.execute( + """UPDATE crawler_documents SET + job_id = $1, file_size_bytes = $2, file_hash = $3, + extracted_text = $4, extraction_status = $5, + classification = $6, classification_confidence = $7, + classification_reasoning = $8, classification_corrected = false, + last_seen_at = NOW(), version_count = version_count + 1, + updated_at = NOW() + WHERE id = $9""", + uuid.UUID(job_id), crawled.file_size_bytes, crawled.file_hash, + extracted_text, extraction_status, + classification_result["classification"], + classification_result["confidence"], + classification_result["reasoning"], + existing["id"], + ) + else: + # Insert new + await conn.execute( + """INSERT INTO crawler_documents + (tenant_id, source_id, job_id, file_path, file_name, file_extension, + file_size_bytes, file_hash, extracted_text, extraction_status, + classification, classification_confidence, classification_reasoning) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13)""", + uuid.UUID(tenant_id), uuid.UUID(source_id), uuid.UUID(job_id), + crawled.file_path, crawled.file_name, crawled.file_extension, + crawled.file_size_bytes, crawled.file_hash, + extracted_text, extraction_status, + classification_result["classification"], + classification_result["confidence"], + classification_result["reasoning"], + ) + + stats["files_processed"] += 1 + + except Exception: + stats["files_error"] += 1 + stats["files_processed"] += 1 + + # Update job progress + async with pool.acquire() as conn: + await conn.execute( + """UPDATE crawler_jobs SET + files_found=$2, files_processed=$3, files_new=$4, + files_changed=$5, files_skipped=$6, files_error=$7 + WHERE id = $1""", + uuid.UUID(job_id), + stats["files_found"], stats["files_processed"], + stats["files_new"], stats["files_changed"], + stats["files_skipped"], stats["files_error"], + ) + + # Mark completed + async with pool.acquire() as conn: + await conn.execute( + "UPDATE crawler_jobs SET status = 'completed', completed_at = NOW() WHERE id = $1", + uuid.UUID(job_id), + ) + + +@router.post("/jobs", status_code=201) +async def create_job( + body: JobCreate, + background_tasks: BackgroundTasks, + x_tenant_id: str = Header(...), +): + pool = await get_pool() + async with pool.acquire() as conn: + # Verify source exists + source = await conn.fetchrow( + "SELECT id FROM crawler_sources WHERE id = $1 AND tenant_id = $2", + uuid.UUID(body.source_id), uuid.UUID(x_tenant_id), + ) + if not source: + raise HTTPException(404, "Source not found") + + # Check no job already running for this source + running = await conn.fetchval( + "SELECT EXISTS(SELECT 1 FROM crawler_jobs WHERE source_id = $1 AND status = 'running')", + uuid.UUID(body.source_id), + ) + if running: + raise HTTPException(409, "A job is already running for this source") + + row = await conn.fetchrow( + """INSERT INTO crawler_jobs (tenant_id, source_id, job_type) + VALUES ($1, $2, $3) RETURNING *""", + uuid.UUID(x_tenant_id), uuid.UUID(body.source_id), body.job_type, + ) + + job_id = str(row["id"]) + background_tasks.add_task(_run_crawl_job, job_id, body.source_id, x_tenant_id, body.job_type) + + return dict(row) + + +@router.get("/jobs") +async def list_jobs(x_tenant_id: str = Header(...)): + pool = await get_pool() + async with pool.acquire() as conn: + rows = await conn.fetch( + """SELECT j.*, s.name as source_name + FROM crawler_jobs j JOIN crawler_sources s ON j.source_id = s.id + WHERE j.tenant_id = $1 ORDER BY j.created_at DESC LIMIT 50""", + uuid.UUID(x_tenant_id), + ) + return [dict(r) for r in rows] + + +@router.get("/jobs/{job_id}") +async def get_job(job_id: str, x_tenant_id: str = Header(...)): + pool = await get_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow( + "SELECT * FROM crawler_jobs WHERE id = $1 AND tenant_id = $2", + uuid.UUID(job_id), uuid.UUID(x_tenant_id), + ) + if not row: + raise HTTPException(404, "Job not found") + return dict(row) + + +@router.post("/jobs/{job_id}/cancel") +async def cancel_job(job_id: str, x_tenant_id: str = Header(...)): + pool = await get_pool() + async with pool.acquire() as conn: + result = await conn.execute( + "UPDATE crawler_jobs SET status = 'cancelled', completed_at = NOW() WHERE id = $1 AND tenant_id = $2 AND status IN ('pending', 'running')", + uuid.UUID(job_id), uuid.UUID(x_tenant_id), + ) + if result == "UPDATE 0": + raise HTTPException(404, "Job not found or not cancellable") + return {"status": "cancelled"} diff --git a/document-crawler/api/reports.py b/document-crawler/api/reports.py new file mode 100644 index 0000000..c481f38 --- /dev/null +++ b/document-crawler/api/reports.py @@ -0,0 +1,99 @@ +"""Onboarding report + gap analysis endpoints.""" + +import json +import uuid +from fastapi import APIRouter, HTTPException, Header +from pydantic import BaseModel + +from db import get_pool +from gap_analysis.analyzer import generate_gap_analysis + +router = APIRouter(tags=["reports"]) + + +class ReportGenerate(BaseModel): + job_id: str | None = None + company_profiles: list[str] = ["universal", "data_processor", "ai_user"] + + +@router.post("/reports/generate", status_code=201) +async def generate_report(body: ReportGenerate, x_tenant_id: str = Header(...)): + pool = await get_pool() + tid = uuid.UUID(x_tenant_id) + + async with pool.acquire() as conn: + # Count documents by classification for this tenant + rows = await conn.fetch( + """SELECT classification, COUNT(*) as cnt + FROM crawler_documents + WHERE tenant_id = $1 AND classification IS NOT NULL + GROUP BY classification""", + tid, + ) + classification_counts = {r["classification"]: r["cnt"] for r in rows} + + total_docs = await conn.fetchval( + "SELECT COUNT(*) FROM crawler_documents WHERE tenant_id = $1", tid + ) + + # Run gap analysis + analysis = generate_gap_analysis(classification_counts, body.company_profiles) + + # Store report + async with pool.acquire() as conn: + jid = uuid.UUID(body.job_id) if body.job_id else None + row = await conn.fetchrow( + """INSERT INTO crawler_onboarding_reports + (tenant_id, job_id, total_documents_found, classification_breakdown, gaps, compliance_score) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING *""", + tid, jid, total_docs, + json.dumps(classification_counts), + json.dumps(analysis["gaps"]), + analysis["compliance_score"], + ) + + result = dict(row) + result["gap_summary"] = analysis["gap_summary"] + result["covered"] = analysis["covered"] + result["total_required"] = analysis["total_required"] + return result + + +@router.get("/reports") +async def list_reports(x_tenant_id: str = Header(...)): + pool = await get_pool() + async with pool.acquire() as conn: + rows = await conn.fetch( + "SELECT * FROM crawler_onboarding_reports WHERE tenant_id = $1 ORDER BY created_at DESC LIMIT 20", + uuid.UUID(x_tenant_id), + ) + return [dict(r) for r in rows] + + +@router.get("/reports/{report_id}") +async def get_report(report_id: str, x_tenant_id: str = Header(...)): + pool = await get_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow( + "SELECT * FROM crawler_onboarding_reports WHERE id = $1 AND tenant_id = $2", + uuid.UUID(report_id), uuid.UUID(x_tenant_id), + ) + if not row: + raise HTTPException(404, "Report not found") + + result = dict(row) + # Parse stored JSON + if isinstance(result.get("gaps"), str): + result["gaps"] = json.loads(result["gaps"]) + if isinstance(result.get("classification_breakdown"), str): + result["classification_breakdown"] = json.loads(result["classification_breakdown"]) + + # Add computed summary + gaps = result.get("gaps", []) + result["gap_summary"] = { + "critical": sum(1 for g in gaps if g.get("severity") == "CRITICAL"), + "high": sum(1 for g in gaps if g.get("severity") == "HIGH"), + "medium": sum(1 for g in gaps if g.get("severity") == "MEDIUM"), + } + return result diff --git a/document-crawler/api/sources.py b/document-crawler/api/sources.py new file mode 100644 index 0000000..d316d5b --- /dev/null +++ b/document-crawler/api/sources.py @@ -0,0 +1,148 @@ +"""Crawl source CRUD endpoints.""" + +import json +import os +import uuid +from fastapi import APIRouter, HTTPException, Header +from pydantic import BaseModel + +from db import get_pool +from config import settings + +router = APIRouter(tags=["sources"]) + + +class SourceCreate(BaseModel): + name: str + source_type: str = "local" + path: str + file_extensions: list[str] = [".pdf", ".docx", ".xlsx", ".pptx"] + max_depth: int = 5 + exclude_patterns: list[str] = [] + enabled: bool = True + + +class SourceUpdate(BaseModel): + name: str | None = None + path: str | None = None + file_extensions: list[str] | None = None + max_depth: int | None = None + exclude_patterns: list[str] | None = None + enabled: bool | None = None + + +@router.get("/sources") +async def list_sources(x_tenant_id: str = Header(...)): + pool = await get_pool() + async with pool.acquire() as conn: + rows = await conn.fetch( + "SELECT * FROM crawler_sources WHERE tenant_id = $1 ORDER BY created_at DESC", + uuid.UUID(x_tenant_id), + ) + return [dict(r) for r in rows] + + +@router.post("/sources", status_code=201) +async def create_source(body: SourceCreate, x_tenant_id: str = Header(...)): + pool = await get_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow( + """INSERT INTO crawler_sources + (tenant_id, name, source_type, path, file_extensions, max_depth, exclude_patterns, enabled) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + RETURNING *""", + uuid.UUID(x_tenant_id), + body.name, + body.source_type, + body.path, + json.dumps(body.file_extensions), + body.max_depth, + json.dumps(body.exclude_patterns), + body.enabled, + ) + return dict(row) + + +@router.put("/sources/{source_id}") +async def update_source(source_id: str, body: SourceUpdate, x_tenant_id: str = Header(...)): + pool = await get_pool() + async with pool.acquire() as conn: + existing = await conn.fetchrow( + "SELECT * FROM crawler_sources WHERE id = $1 AND tenant_id = $2", + uuid.UUID(source_id), uuid.UUID(x_tenant_id), + ) + if not existing: + raise HTTPException(404, "Source not found") + + updates = {} + if body.name is not None: + updates["name"] = body.name + if body.path is not None: + updates["path"] = body.path + if body.file_extensions is not None: + updates["file_extensions"] = json.dumps(body.file_extensions) + if body.max_depth is not None: + updates["max_depth"] = body.max_depth + if body.exclude_patterns is not None: + updates["exclude_patterns"] = json.dumps(body.exclude_patterns) + if body.enabled is not None: + updates["enabled"] = body.enabled + + if updates: + set_clause = ", ".join(f"{k} = ${i+3}" for i, k in enumerate(updates)) + sql = f"UPDATE crawler_sources SET {set_clause}, updated_at = NOW() WHERE id = $1 AND tenant_id = $2 RETURNING *" + row = await conn.fetchrow( + sql, uuid.UUID(source_id), uuid.UUID(x_tenant_id), *updates.values() + ) + return dict(row) + + return dict(existing) + + +@router.delete("/sources/{source_id}", status_code=204) +async def delete_source(source_id: str, x_tenant_id: str = Header(...)): + pool = await get_pool() + async with pool.acquire() as conn: + result = await conn.execute( + "DELETE FROM crawler_sources WHERE id = $1 AND tenant_id = $2", + uuid.UUID(source_id), uuid.UUID(x_tenant_id), + ) + if result == "DELETE 0": + raise HTTPException(404, "Source not found") + + +@router.post("/sources/{source_id}/test") +async def test_source(source_id: str, x_tenant_id: str = Header(...)): + """Test connectivity to a crawl source.""" + pool = await get_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow( + "SELECT * FROM crawler_sources WHERE id = $1 AND tenant_id = $2", + uuid.UUID(source_id), uuid.UUID(x_tenant_id), + ) + if not row: + raise HTTPException(404, "Source not found") + + # For local sources, check if the path exists inside the container + source_path = row["path"] + # Resolve relative to CRAWL_BASE_PATH + if not os.path.isabs(source_path): + source_path = os.path.join(settings.CRAWL_BASE_PATH, source_path) + + exists = os.path.isdir(source_path) + file_count = 0 + if exists: + exts = json.loads(row["file_extensions"]) if isinstance(row["file_extensions"], str) else row["file_extensions"] + for root, dirs, files in os.walk(source_path): + for f in files: + _, ext = os.path.splitext(f) + if ext.lower() in exts: + file_count += 1 + break # only top-level for test + + return { + "reachable": exists, + "path_resolved": source_path, + "sample_file_count": file_count, + "message": "Pfad erreichbar" if exists else "Pfad nicht gefunden", + } diff --git a/document-crawler/archiver/__init__.py b/document-crawler/archiver/__init__.py new file mode 100644 index 0000000..65d1087 --- /dev/null +++ b/document-crawler/archiver/__init__.py @@ -0,0 +1 @@ +from .dsms_client import archive_document diff --git a/document-crawler/archiver/dsms_client.py b/document-crawler/archiver/dsms_client.py new file mode 100644 index 0000000..8f5cd21 --- /dev/null +++ b/document-crawler/archiver/dsms_client.py @@ -0,0 +1,37 @@ +"""Client for dsms-gateway (IPFS) document archival.""" + +import httpx + +from config import settings + + +async def archive_document( + file_path: str, + file_name: str, + document_type: str, + document_id: str, + auth_token: str = "Bearer system-crawler", +) -> dict: + """Archive a document to IPFS via the DSMS gateway. + + Returns dict with cid, size, gateway_url on success. + Raises on failure. + """ + async with httpx.AsyncClient(timeout=120.0) as client: + with open(file_path, "rb") as f: + resp = await client.post( + f"{settings.DSMS_GATEWAY_URL}/api/v1/documents", + files={"file": (file_name, f)}, + data={ + "document_type": "compliance_document", + "document_id": document_id, + "version": "1", + "language": "de", + }, + headers={"Authorization": auth_token}, + ) + + if resp.status_code != 200: + raise RuntimeError(f"DSMS archive failed ({resp.status_code}): {resp.text}") + + return resp.json() diff --git a/document-crawler/classifiers/__init__.py b/document-crawler/classifiers/__init__.py new file mode 100644 index 0000000..66ebd26 --- /dev/null +++ b/document-crawler/classifiers/__init__.py @@ -0,0 +1,2 @@ +from .llm_classifier import classify_document +from .keyword_fallback import keyword_classify diff --git a/document-crawler/classifiers/keyword_fallback.py b/document-crawler/classifiers/keyword_fallback.py new file mode 100644 index 0000000..d0c1fe9 --- /dev/null +++ b/document-crawler/classifiers/keyword_fallback.py @@ -0,0 +1,80 @@ +"""Heuristic keyword-based classification fallback.""" + +# Keyword patterns per category — order matters (first match wins on tie) +KEYWORD_MAP: list[tuple[str, list[str]]] = [ + ("VVT", [ + "verarbeitungsverzeichnis", "verzeichnis von verarbeitungstaetigkeiten", + "verarbeitungstaetigkeit", "art. 30", "art 30", "zweck der verarbeitung", + "kategorie betroffener personen", "datenkategorien", + ]), + ("TOM", [ + "technisch-organisatorische massnahmen", "technische und organisatorische", + "art. 32", "art 32", "zutrittskontrolle", "zugangskontrolle", + "zugriffskontrolle", "verschluesselungskonzept", "pseudonymisierung", + ]), + ("DSE", [ + "datenschutzerklaerung", "datenschutzhinweise", "privacy policy", + "informationspflichten", "art. 13", "art. 14", "art 13", "art 14", + "betroffenenrechte", "verantwortlicher im sinne", + ]), + ("AVV", [ + "auftragsverarbeitung", "auftragsverarbeitungsvertrag", + "art. 28", "art 28", "weisungsgebundenheit", "unterauftragnehmer", + "subunternehmer", + ]), + ("DSFA", [ + "datenschutz-folgenabschaetzung", "folgenabschaetzung", + "art. 35", "art 35", "risikoanalyse", "hohes risiko", + "systematische beschreibung", + ]), + ("Loeschkonzept", [ + "loeschkonzept", "loeschfristen", "aufbewahrungsfrist", + "loeschung personenbezogener", "speicherdauer", "vorhaltefrist", + ]), + ("Einwilligung", [ + "einwilligung", "einwilligungserklaerung", "consent", + "freiwillige zustimmung", "widerruf der einwilligung", + ]), + ("Vertrag", [ + "vertrag", "vereinbarung", "vertragspartner", + "leistungsbeschreibung", "vertragsgegenstand", + ]), + ("Richtlinie", [ + "richtlinie", "policy", "datenschutzrichtlinie", "leitlinie", + "verhaltensregeln", "organisationsanweisung", + ]), + ("Schulungsnachweis", [ + "schulungsnachweis", "schulung", "training", "datenschutzschulung", + "teilnahmebestaetigung", "fortbildung datenschutz", + ]), +] + +MAX_KEYWORD_CONFIDENCE = 0.3 + + +def keyword_classify(text: str, filename: str) -> dict: + """Classify document by keyword matching. Confidence capped at 0.3.""" + combined = (filename + " " + text).lower() + + best_category = "Sonstiges" + best_score = 0 + + for category, keywords in KEYWORD_MAP: + score = sum(1 for kw in keywords if kw in combined) + if score > best_score: + best_score = score + best_category = category + + if best_score == 0: + return { + "classification": "Sonstiges", + "confidence": 0.1, + "reasoning": "Keine Schluesselwoerter gefunden (Keyword-Fallback)", + } + + confidence = min(best_score * 0.1, MAX_KEYWORD_CONFIDENCE) + return { + "classification": best_category, + "confidence": confidence, + "reasoning": f"Keyword-Fallback: {best_score} Treffer fuer {best_category}", + } diff --git a/document-crawler/classifiers/llm_classifier.py b/document-crawler/classifiers/llm_classifier.py new file mode 100644 index 0000000..7849fd5 --- /dev/null +++ b/document-crawler/classifiers/llm_classifier.py @@ -0,0 +1,73 @@ +"""LLM-based document classification via ai-compliance-sdk.""" + +import json +import httpx + +from config import settings +from .prompts import ( + CLASSIFICATION_SYSTEM_PROMPT, + CLASSIFICATION_USER_PROMPT, + VALID_CLASSIFICATIONS, +) +from .keyword_fallback import keyword_classify + + +async def classify_document( + text: str, + filename: str, + tenant_id: str, + user_id: str = "system", +) -> dict: + """Classify a document using the LLM gateway. + + Returns dict with keys: classification, confidence, reasoning. + Falls back to keyword heuristic if LLM is unavailable. + """ + truncated = text[: settings.LLM_TEXT_LIMIT] + user_prompt = CLASSIFICATION_USER_PROMPT.format( + filename=filename, text=truncated + ) + + try: + async with httpx.AsyncClient(timeout=60.0) as client: + resp = await client.post( + f"{settings.LLM_GATEWAY_URL}/sdk/v1/llm/chat", + json={ + "messages": [ + {"role": "system", "content": CLASSIFICATION_SYSTEM_PROMPT}, + {"role": "user", "content": user_prompt}, + ], + "temperature": 0.1, + "max_tokens": 300, + }, + headers={ + "X-Tenant-ID": tenant_id, + "X-User-ID": user_id, + "Content-Type": "application/json", + }, + ) + + if resp.status_code != 200: + return keyword_classify(text, filename) + + data = resp.json() + # The SDK returns the assistant message content + content = ( + data.get("content") + or data.get("message", {}).get("content") + or data.get("choices", [{}])[0].get("message", {}).get("content", "") + ) + + result = json.loads(content) + classification = result.get("classification", "Sonstiges") + if classification not in VALID_CLASSIFICATIONS: + classification = "Sonstiges" + + return { + "classification": classification, + "confidence": min(max(float(result.get("confidence", 0.5)), 0.0), 1.0), + "reasoning": result.get("reasoning", ""), + } + + except (httpx.RequestError, json.JSONDecodeError, KeyError, IndexError): + return keyword_classify(text, filename) diff --git a/document-crawler/classifiers/prompts.py b/document-crawler/classifiers/prompts.py new file mode 100644 index 0000000..2e3dcb0 --- /dev/null +++ b/document-crawler/classifiers/prompts.py @@ -0,0 +1,33 @@ +"""Classification prompt templates for LLM-based document classification.""" + +CLASSIFICATION_SYSTEM_PROMPT = """Du bist ein Experte fuer DSGVO-Compliance-Dokumentation. +Deine Aufgabe ist es, Dokumente anhand ihres Inhalts in eine der folgenden Kategorien einzuordnen. + +Kategorien: +- VVT: Verzeichnis von Verarbeitungstaetigkeiten (Art. 30 DSGVO) +- TOM: Technisch-organisatorische Massnahmen (Art. 32 DSGVO) +- DSE: Datenschutzerklaerung (Art. 13/14 DSGVO) +- AVV: Auftragsverarbeitungsvertrag (Art. 28 DSGVO) +- DSFA: Datenschutz-Folgenabschaetzung (Art. 35 DSGVO) +- Loeschkonzept: Loeschfristen und Loeschregeln +- Einwilligung: Einwilligungserklaerungen und Consent-Formulare +- Vertrag: Vertraege mit Datenschutzbezug +- Richtlinie: Interne Datenschutz-Richtlinien und Policies +- Schulungsnachweis: Datenschutz-Schulungen und Nachweise +- Sonstiges: Dokument mit anderem oder unklarem Inhalt + +Antworte AUSSCHLIESSLICH im folgenden JSON-Format: +{"classification": "", "confidence": <0.0-1.0>, "reasoning": ""}""" + +CLASSIFICATION_USER_PROMPT = """Klassifiziere das folgende Dokument: + +Dateiname: {filename} + +Textinhalt (Auszug): +{text}""" + +VALID_CLASSIFICATIONS = [ + "VVT", "TOM", "DSE", "AVV", "DSFA", + "Loeschkonzept", "Einwilligung", "Vertrag", + "Richtlinie", "Schulungsnachweis", "Sonstiges", +] diff --git a/document-crawler/config.py b/document-crawler/config.py new file mode 100644 index 0000000..d02dba8 --- /dev/null +++ b/document-crawler/config.py @@ -0,0 +1,21 @@ +"""Environment-based settings for Document Crawler service.""" + +import os + + +class Settings: + PORT: int = int(os.getenv("PORT", "8098")) + DATABASE_URL: str = os.getenv( + "DATABASE_URL", + "postgresql://breakpilot:breakpilot123@bp-core-postgres:5432/breakpilot_db" + ) + LLM_GATEWAY_URL: str = os.getenv("LLM_GATEWAY_URL", "http://ai-compliance-sdk:8090") + DSMS_GATEWAY_URL: str = os.getenv("DSMS_GATEWAY_URL", "http://dsms-gateway:8082") + CRAWL_BASE_PATH: str = os.getenv("CRAWL_BASE_PATH", "/data/crawl") + MAX_FILE_SIZE_MB: int = int(os.getenv("MAX_FILE_SIZE_MB", "50")) + MAX_FILE_SIZE_BYTES: int = MAX_FILE_SIZE_MB * 1024 * 1024 + LLM_TEXT_LIMIT: int = int(os.getenv("LLM_TEXT_LIMIT", "3000")) + SUPPORTED_EXTENSIONS: list[str] = [".pdf", ".docx", ".xlsx", ".pptx"] + + +settings = Settings() diff --git a/document-crawler/crawlers/__init__.py b/document-crawler/crawlers/__init__.py new file mode 100644 index 0000000..3cfcf30 --- /dev/null +++ b/document-crawler/crawlers/__init__.py @@ -0,0 +1 @@ +from .filesystem_crawler import FilesystemCrawler diff --git a/document-crawler/crawlers/filesystem_crawler.py b/document-crawler/crawlers/filesystem_crawler.py new file mode 100644 index 0000000..1db9eac --- /dev/null +++ b/document-crawler/crawlers/filesystem_crawler.py @@ -0,0 +1,100 @@ +"""Local/NAS directory scanning with SHA-256 delta detection.""" + +import os +import hashlib +import json +from dataclasses import dataclass +from datetime import datetime, timezone + +from config import settings + + +@dataclass +class CrawledFile: + file_path: str + file_name: str + file_extension: str + file_size_bytes: int + file_hash: str + relative_path: str + + +class FilesystemCrawler: + """Walks a directory tree and discovers document files.""" + + def __init__( + self, + base_path: str, + file_extensions: list[str] | None = None, + max_depth: int = 5, + exclude_patterns: list[str] | None = None, + ): + self.base_path = base_path + self.file_extensions = file_extensions or settings.SUPPORTED_EXTENSIONS + self.max_depth = max_depth + self.exclude_patterns = exclude_patterns or [] + + def _should_exclude(self, path: str) -> bool: + for pattern in self.exclude_patterns: + if pattern in path: + return True + return False + + def _hash_file(self, path: str) -> str: + sha256 = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + sha256.update(chunk) + return sha256.hexdigest() + + def crawl(self) -> list[CrawledFile]: + """Walk the directory tree and return discovered files.""" + results: list[CrawledFile] = [] + + if not os.path.isdir(self.base_path): + return results + + for root, dirs, files in os.walk(self.base_path): + # Compute depth relative to base + rel = os.path.relpath(root, self.base_path) + depth = 0 if rel == "." else rel.count(os.sep) + 1 + if depth >= self.max_depth: + dirs.clear() + continue + + if self._should_exclude(root): + dirs.clear() + continue + + for fname in files: + full_path = os.path.join(root, fname) + _, ext = os.path.splitext(fname) + ext = ext.lower() + + if ext not in self.file_extensions: + continue + + if self._should_exclude(full_path): + continue + + try: + stat = os.stat(full_path) + except OSError: + continue + + if stat.st_size > settings.MAX_FILE_SIZE_BYTES: + continue + + file_hash = self._hash_file(full_path) + relative = os.path.relpath(full_path, self.base_path) + + results.append(CrawledFile( + file_path=full_path, + file_name=fname, + file_extension=ext, + file_size_bytes=stat.st_size, + file_hash=file_hash, + relative_path=relative, + )) + + return results diff --git a/document-crawler/crawlers/smb_crawler.py b/document-crawler/crawlers/smb_crawler.py new file mode 100644 index 0000000..80f3aec --- /dev/null +++ b/document-crawler/crawlers/smb_crawler.py @@ -0,0 +1,8 @@ +"""SMB/CIFS share scanning — placeholder for Phase 2.""" + + +class SMBCrawler: + """Placeholder for SMB share scanning in a future phase.""" + + def __init__(self, *args, **kwargs): + raise NotImplementedError("SMB crawling is planned for Phase 2") diff --git a/document-crawler/db.py b/document-crawler/db.py new file mode 100644 index 0000000..57482df --- /dev/null +++ b/document-crawler/db.py @@ -0,0 +1,47 @@ +"""asyncpg pool management for Document Crawler.""" + +import asyncpg +from config import settings + +_pool: asyncpg.Pool | None = None + + +async def get_pool() -> asyncpg.Pool: + global _pool + if _pool is None: + _pool = await asyncpg.create_pool( + settings.DATABASE_URL, + min_size=2, + max_size=10, + ) + return _pool + + +async def close_pool(): + global _pool + if _pool is not None: + await _pool.close() + _pool = None + + +async def run_migration(): + """Run the crawler migration on startup.""" + import os + migration_path = os.path.join( + os.path.dirname(__file__), "migrations", "014_crawler_tables.sql" + ) + if not os.path.exists(migration_path): + return + + pool = await get_pool() + async with pool.acquire() as conn: + # Check if migration already applied + exists = await conn.fetchval( + "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_name = 'crawler_sources')" + ) + if exists: + return + + with open(migration_path) as f: + sql = f.read() + await conn.execute(sql) diff --git a/document-crawler/extractors/__init__.py b/document-crawler/extractors/__init__.py new file mode 100644 index 0000000..477e104 --- /dev/null +++ b/document-crawler/extractors/__init__.py @@ -0,0 +1 @@ +from .dispatcher import extract_text diff --git a/document-crawler/extractors/dispatcher.py b/document-crawler/extractors/dispatcher.py new file mode 100644 index 0000000..9fd87d9 --- /dev/null +++ b/document-crawler/extractors/dispatcher.py @@ -0,0 +1,25 @@ +"""Routes files to the appropriate extractor by extension.""" + +from .pdf_extractor import extract_pdf +from .docx_extractor import extract_docx +from .xlsx_extractor import extract_xlsx +from .pptx_extractor import extract_pptx + +EXTRACTORS = { + ".pdf": extract_pdf, + ".docx": extract_docx, + ".xlsx": extract_xlsx, + ".pptx": extract_pptx, +} + + +def extract_text(file_path: str, extension: str) -> str: + """Extract text from a file based on its extension. + + Returns extracted text or raises ValueError for unsupported types. + """ + ext = extension.lower() + extractor = EXTRACTORS.get(ext) + if extractor is None: + raise ValueError(f"Unsupported file extension: {ext}") + return extractor(file_path) diff --git a/document-crawler/extractors/docx_extractor.py b/document-crawler/extractors/docx_extractor.py new file mode 100644 index 0000000..9b9b86d --- /dev/null +++ b/document-crawler/extractors/docx_extractor.py @@ -0,0 +1,18 @@ +"""DOCX text extraction using python-docx.""" + +from docx import Document + + +def extract_docx(file_path: str) -> str: + """Extract text from a DOCX file.""" + doc = Document(file_path) + paragraphs = [p.text for p in doc.paragraphs if p.text.strip()] + + # Also extract from tables + for table in doc.tables: + for row in table.rows: + cells = [cell.text.strip() for cell in row.cells if cell.text.strip()] + if cells: + paragraphs.append(" | ".join(cells)) + + return "\n\n".join(paragraphs) diff --git a/document-crawler/extractors/pdf_extractor.py b/document-crawler/extractors/pdf_extractor.py new file mode 100644 index 0000000..dd72661 --- /dev/null +++ b/document-crawler/extractors/pdf_extractor.py @@ -0,0 +1,15 @@ +"""PDF text extraction using PyMuPDF (fitz).""" + +import fitz + + +def extract_pdf(file_path: str) -> str: + """Extract text from a PDF file.""" + doc = fitz.open(file_path) + pages = [] + for page in doc: + text = page.get_text() + if text.strip(): + pages.append(text) + doc.close() + return "\n\n".join(pages) diff --git a/document-crawler/extractors/pptx_extractor.py b/document-crawler/extractors/pptx_extractor.py new file mode 100644 index 0000000..641eaa5 --- /dev/null +++ b/document-crawler/extractors/pptx_extractor.py @@ -0,0 +1,22 @@ +"""PPTX text extraction using python-pptx.""" + +from pptx import Presentation + + +def extract_pptx(file_path: str) -> str: + """Extract text from a PPTX file.""" + prs = Presentation(file_path) + slides = [] + + for i, slide in enumerate(prs.slides, 1): + texts = [] + for shape in slide.shapes: + if shape.has_text_frame: + for para in shape.text_frame.paragraphs: + text = para.text.strip() + if text: + texts.append(text) + if texts: + slides.append(f"[Folie {i}]\n" + "\n".join(texts)) + + return "\n\n".join(slides) diff --git a/document-crawler/extractors/xlsx_extractor.py b/document-crawler/extractors/xlsx_extractor.py new file mode 100644 index 0000000..41943d6 --- /dev/null +++ b/document-crawler/extractors/xlsx_extractor.py @@ -0,0 +1,22 @@ +"""XLSX text extraction using openpyxl.""" + +from openpyxl import load_workbook + + +def extract_xlsx(file_path: str) -> str: + """Extract text from an XLSX file.""" + wb = load_workbook(file_path, read_only=True, data_only=True) + sheets = [] + + for sheet_name in wb.sheetnames: + ws = wb[sheet_name] + rows = [] + for row in ws.iter_rows(values_only=True): + cells = [str(c) for c in row if c is not None] + if cells: + rows.append(" | ".join(cells)) + if rows: + sheets.append(f"[{sheet_name}]\n" + "\n".join(rows)) + + wb.close() + return "\n\n".join(sheets) diff --git a/document-crawler/gap_analysis/__init__.py b/document-crawler/gap_analysis/__init__.py new file mode 100644 index 0000000..5ba9012 --- /dev/null +++ b/document-crawler/gap_analysis/__init__.py @@ -0,0 +1,2 @@ +from .analyzer import generate_gap_analysis +from .compliance_matrix import COMPLIANCE_MATRIX diff --git a/document-crawler/gap_analysis/analyzer.py b/document-crawler/gap_analysis/analyzer.py new file mode 100644 index 0000000..e165496 --- /dev/null +++ b/document-crawler/gap_analysis/analyzer.py @@ -0,0 +1,59 @@ +"""Gap detection logic — compares found documents against compliance matrix.""" + +import uuid +from .compliance_matrix import COMPLIANCE_MATRIX, RequiredDocument + + +def generate_gap_analysis( + classification_counts: dict[str, int], + company_profiles: list[str] | None = None, +) -> dict: + """Analyze gaps between found documents and required compliance matrix. + + Args: + classification_counts: e.g. {"VVT": 2, "TOM": 1, "DSE": 0} + company_profiles: list of applicable profiles. + Default: ["universal", "data_processor", "ai_user"] + + Returns dict with compliance_score, gaps list, classification_breakdown. + """ + if company_profiles is None: + company_profiles = ["universal", "data_processor", "ai_user"] + + applicable = [ + req for req in COMPLIANCE_MATRIX + if req.applies_to in company_profiles + ] + + gaps = [] + covered = 0 + + for req in applicable: + count = classification_counts.get(req.category, 0) + if count == 0: + gaps.append({ + "id": str(uuid.uuid4()), + "category": req.category, + "description": req.description, + "severity": req.severity, + "regulation": req.regulation, + "requiredAction": f"{req.category} erstellen und dokumentieren", + "relatedStepId": None, + }) + else: + covered += 1 + + total_required = len(applicable) + compliance_score = (covered / total_required * 100) if total_required > 0 else 0 + + return { + "compliance_score": round(compliance_score, 1), + "total_required": total_required, + "covered": covered, + "gaps": gaps, + "gap_summary": { + "critical": sum(1 for g in gaps if g["severity"] == "CRITICAL"), + "high": sum(1 for g in gaps if g["severity"] == "HIGH"), + "medium": sum(1 for g in gaps if g["severity"] == "MEDIUM"), + }, + } diff --git a/document-crawler/gap_analysis/compliance_matrix.py b/document-crawler/gap_analysis/compliance_matrix.py new file mode 100644 index 0000000..517d7f5 --- /dev/null +++ b/document-crawler/gap_analysis/compliance_matrix.py @@ -0,0 +1,75 @@ +"""Required documents per regulation and company type.""" + +from dataclasses import dataclass + + +@dataclass +class RequiredDocument: + category: str + description: str + regulation: str + severity: str # CRITICAL, HIGH, MEDIUM + applies_to: str # universal, data_processor, ai_user, large_company + + +COMPLIANCE_MATRIX: list[RequiredDocument] = [ + # Universal — every company + RequiredDocument( + category="VVT", + description="Verzeichnis von Verarbeitungstaetigkeiten fehlt", + regulation="Art. 30 DSGVO", + severity="CRITICAL", + applies_to="universal", + ), + RequiredDocument( + category="TOM", + description="Technisch-organisatorische Massnahmen nicht dokumentiert", + regulation="Art. 32 DSGVO", + severity="CRITICAL", + applies_to="universal", + ), + RequiredDocument( + category="DSE", + description="Datenschutzerklaerung fehlt oder unvollstaendig", + regulation="Art. 13/14 DSGVO", + severity="CRITICAL", + applies_to="universal", + ), + RequiredDocument( + category="Loeschkonzept", + description="Kein Loeschkonzept / keine Loeschfristen definiert", + regulation="Art. 17 DSGVO / Art. 5 Abs. 1e DSGVO", + severity="HIGH", + applies_to="universal", + ), + RequiredDocument( + category="Richtlinie", + description="Interne Datenschutzrichtlinie fehlt", + regulation="Art. 24 DSGVO", + severity="MEDIUM", + applies_to="universal", + ), + RequiredDocument( + category="Schulungsnachweis", + description="Keine Datenschutz-Schulungsnachweise vorhanden", + regulation="Art. 39 Abs. 1b DSGVO", + severity="MEDIUM", + applies_to="universal", + ), + # Data processors + RequiredDocument( + category="AVV", + description="Auftragsverarbeitungsvertrag fehlt", + regulation="Art. 28 DSGVO", + severity="CRITICAL", + applies_to="data_processor", + ), + # AI users + RequiredDocument( + category="DSFA", + description="Datenschutz-Folgenabschaetzung fuer KI-Systeme fehlt", + regulation="Art. 35 DSGVO / EU AI Act", + severity="HIGH", + applies_to="ai_user", + ), +] diff --git a/document-crawler/main.py b/document-crawler/main.py new file mode 100644 index 0000000..3785347 --- /dev/null +++ b/document-crawler/main.py @@ -0,0 +1,63 @@ +""" +Document Crawler & Auto-Onboarding Service +Scans local filesystems for compliance documents, classifies them via LLM, +archives to IPFS, and generates compliance gap analysis reports. +""" + +from contextlib import asynccontextmanager +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware + +from db import get_pool, close_pool, run_migration +from api.sources import router as sources_router +from api.jobs import router as jobs_router +from api.documents import router as documents_router +from api.reports import router as reports_router + + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup + await get_pool() + await run_migration() + yield + # Shutdown + await close_pool() + + +app = FastAPI( + title="Document Crawler", + description="Auto-Onboarding: Filesystem scanning, LLM classification, IPFS archival, gap analysis", + version="1.0.0", + lifespan=lifespan, +) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# Register routers +app.include_router(sources_router, prefix="/api/v1/crawler") +app.include_router(jobs_router, prefix="/api/v1/crawler") +app.include_router(documents_router, prefix="/api/v1/crawler") +app.include_router(reports_router, prefix="/api/v1/crawler") + + +@app.get("/health") +async def health(): + try: + pool = await get_pool() + async with pool.acquire() as conn: + await conn.fetchval("SELECT 1") + return {"status": "healthy", "service": "document-crawler"} + except Exception as e: + return {"status": "degraded", "error": str(e)} + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8098) diff --git a/document-crawler/migrations/014_crawler_tables.sql b/document-crawler/migrations/014_crawler_tables.sql new file mode 100644 index 0000000..e7c5e6d --- /dev/null +++ b/document-crawler/migrations/014_crawler_tables.sql @@ -0,0 +1,93 @@ +-- Migration 014: Document Crawler & Auto-Onboarding tables +-- 4 tables for crawl source management, job tracking, document storage, and reports + +BEGIN; + +-- 1. Crawl Sources — configurable directories to scan +CREATE TABLE IF NOT EXISTS crawler_sources ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + tenant_id UUID NOT NULL, + name VARCHAR(255) NOT NULL, + source_type VARCHAR(50) NOT NULL DEFAULT 'local', -- local, smb + path TEXT NOT NULL, + file_extensions JSONB NOT NULL DEFAULT '[".pdf", ".docx", ".xlsx", ".pptx"]', + max_depth INT NOT NULL DEFAULT 5, + exclude_patterns JSONB NOT NULL DEFAULT '[]', + enabled BOOLEAN NOT NULL DEFAULT true, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_crawler_sources_tenant ON crawler_sources(tenant_id); + +-- 2. Crawl Jobs — each crawl execution +CREATE TABLE IF NOT EXISTS crawler_jobs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + tenant_id UUID NOT NULL, + source_id UUID NOT NULL REFERENCES crawler_sources(id) ON DELETE CASCADE, + status VARCHAR(50) NOT NULL DEFAULT 'pending', -- pending, running, completed, failed, cancelled + job_type VARCHAR(50) NOT NULL DEFAULT 'full', -- full, delta + files_found INT NOT NULL DEFAULT 0, + files_processed INT NOT NULL DEFAULT 0, + files_new INT NOT NULL DEFAULT 0, + files_changed INT NOT NULL DEFAULT 0, + files_skipped INT NOT NULL DEFAULT 0, + files_error INT NOT NULL DEFAULT 0, + error_message TEXT, + started_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_crawler_jobs_tenant ON crawler_jobs(tenant_id); +CREATE INDEX IF NOT EXISTS idx_crawler_jobs_source ON crawler_jobs(source_id); +CREATE INDEX IF NOT EXISTS idx_crawler_jobs_status ON crawler_jobs(status); + +-- 3. Crawled Documents — every file discovered +CREATE TABLE IF NOT EXISTS crawler_documents ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + tenant_id UUID NOT NULL, + source_id UUID NOT NULL REFERENCES crawler_sources(id) ON DELETE CASCADE, + job_id UUID NOT NULL REFERENCES crawler_jobs(id) ON DELETE CASCADE, + file_path TEXT NOT NULL, + file_name VARCHAR(500) NOT NULL, + file_extension VARCHAR(20) NOT NULL, + file_size_bytes BIGINT NOT NULL DEFAULT 0, + file_hash VARCHAR(64), -- SHA-256 + extracted_text TEXT, + extraction_status VARCHAR(50) NOT NULL DEFAULT 'pending', -- pending, completed, failed + classification VARCHAR(100), + classification_confidence FLOAT, + classification_reasoning TEXT, + classification_corrected BOOLEAN NOT NULL DEFAULT false, + archived BOOLEAN NOT NULL DEFAULT false, + ipfs_cid VARCHAR(200), + archived_at TIMESTAMPTZ, + first_seen_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + last_seen_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + version_count INT NOT NULL DEFAULT 1, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE(tenant_id, source_id, file_path) +); + +CREATE INDEX IF NOT EXISTS idx_crawler_documents_tenant ON crawler_documents(tenant_id); +CREATE INDEX IF NOT EXISTS idx_crawler_documents_source ON crawler_documents(source_id); +CREATE INDEX IF NOT EXISTS idx_crawler_documents_classification ON crawler_documents(classification); +CREATE INDEX IF NOT EXISTS idx_crawler_documents_hash ON crawler_documents(file_hash); + +-- 4. Onboarding Reports — summary reports with gap analysis +CREATE TABLE IF NOT EXISTS crawler_onboarding_reports ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + tenant_id UUID NOT NULL, + job_id UUID REFERENCES crawler_jobs(id) ON DELETE SET NULL, + total_documents_found INT NOT NULL DEFAULT 0, + classification_breakdown JSONB NOT NULL DEFAULT '{}', + gaps JSONB NOT NULL DEFAULT '[]', + compliance_score FLOAT NOT NULL DEFAULT 0.0, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_crawler_reports_tenant ON crawler_onboarding_reports(tenant_id); + +COMMIT; diff --git a/document-crawler/requirements.txt b/document-crawler/requirements.txt new file mode 100644 index 0000000..a52bdb2 --- /dev/null +++ b/document-crawler/requirements.txt @@ -0,0 +1,19 @@ +# Core +fastapi>=0.104.0 +uvicorn>=0.24.0 +httpx>=0.25.0 +pydantic>=2.5.0 +python-multipart>=0.0.6 + +# Database +asyncpg>=0.29.0 + +# Document extraction +PyMuPDF>=1.23.0 +python-docx>=1.1.0 +openpyxl>=3.1.0 +python-pptx>=0.6.21 + +# Testing +pytest>=7.4.0 +pytest-asyncio>=0.21.0 diff --git a/document-crawler/tests/__init__.py b/document-crawler/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/document-crawler/tests/test_classifier.py b/document-crawler/tests/test_classifier.py new file mode 100644 index 0000000..ad26777 --- /dev/null +++ b/document-crawler/tests/test_classifier.py @@ -0,0 +1,36 @@ +"""Tests for keyword fallback classifier.""" + +import pytest +from classifiers.keyword_fallback import keyword_classify + + +def test_vvt_detection(): + text = "Verzeichnis von Verarbeitungstaetigkeiten gemaess Art. 30 DSGVO" + result = keyword_classify(text, "vvt.pdf") + assert result["classification"] == "VVT" + assert result["confidence"] <= 0.3 + + +def test_tom_detection(): + text = "Technisch-organisatorische Massnahmen: Zutrittskontrolle, Zugangskontrolle, Verschluesselungskonzept" + result = keyword_classify(text, "toms.docx") + assert result["classification"] == "TOM" + + +def test_dse_detection(): + text = "Datenschutzerklaerung: Informationspflichten nach Art. 13 DSGVO" + result = keyword_classify(text, "datenschutz.pdf") + assert result["classification"] == "DSE" + + +def test_unknown_document(): + text = "Lorem ipsum dolor sit amet" + result = keyword_classify(text, "random.pdf") + assert result["classification"] == "Sonstiges" + assert result["confidence"] == 0.1 + + +def test_confidence_capped(): + text = "Verarbeitungsverzeichnis Art. 30 Kategorie betroffener Personen Datenkategorien Zweck der Verarbeitung" + result = keyword_classify(text, "vvt_complete.pdf") + assert result["confidence"] <= 0.3 diff --git a/document-crawler/tests/test_extractors.py b/document-crawler/tests/test_extractors.py new file mode 100644 index 0000000..83cc7cb --- /dev/null +++ b/document-crawler/tests/test_extractors.py @@ -0,0 +1,16 @@ +"""Tests for document text extractors.""" + +import pytest +from extractors.dispatcher import extract_text, EXTRACTORS + + +def test_supported_extensions(): + assert ".pdf" in EXTRACTORS + assert ".docx" in EXTRACTORS + assert ".xlsx" in EXTRACTORS + assert ".pptx" in EXTRACTORS + + +def test_unsupported_extension(): + with pytest.raises(ValueError, match="Unsupported"): + extract_text("/tmp/test.txt", ".txt") diff --git a/document-crawler/tests/test_gap_analysis.py b/document-crawler/tests/test_gap_analysis.py new file mode 100644 index 0000000..70c0aaf --- /dev/null +++ b/document-crawler/tests/test_gap_analysis.py @@ -0,0 +1,46 @@ +"""Tests for gap analysis.""" + +import pytest +from gap_analysis.analyzer import generate_gap_analysis + + +def test_full_compliance(): + counts = { + "VVT": 1, "TOM": 1, "DSE": 1, "Loeschkonzept": 1, + "Richtlinie": 1, "Schulungsnachweis": 1, "AVV": 1, "DSFA": 1, + } + result = generate_gap_analysis(counts) + assert result["compliance_score"] == 100.0 + assert len(result["gaps"]) == 0 + + +def test_no_documents(): + result = generate_gap_analysis({}) + assert result["compliance_score"] == 0.0 + assert len(result["gaps"]) > 0 + assert result["gap_summary"]["critical"] > 0 + + +def test_partial_compliance(): + counts = {"VVT": 1, "TOM": 1} + result = generate_gap_analysis(counts) + assert 0 < result["compliance_score"] < 100 + # DSE, Loeschkonzept, Richtlinie, Schulungsnachweis, AVV, DSFA should be gaps + gap_categories = [g["category"] for g in result["gaps"]] + assert "DSE" in gap_categories + assert "Loeschkonzept" in gap_categories + + +def test_universal_only(): + counts = {"VVT": 1, "TOM": 1, "DSE": 1, "Loeschkonzept": 1} + result = generate_gap_analysis(counts, company_profiles=["universal"]) + # Universal requires VVT, TOM, DSE, Loeschkonzept, Richtlinie, Schulungsnachweis + # 4 out of 6 covered + assert result["covered"] == 4 + assert result["total_required"] == 6 + + +def test_gap_severity(): + result = generate_gap_analysis({}, company_profiles=["universal"]) + severities = {g["severity"] for g in result["gaps"]} + assert "CRITICAL" in severities