New standalone Python/FastAPI service for automatic compliance document scanning, LLM-based classification, IPFS archival, and gap analysis. Includes extractors (PDF, DOCX, XLSX, PPTX), keyword fallback classifier, compliance matrix, and full REST API on port 8098. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
60 lines
1.9 KiB
Python
60 lines
1.9 KiB
Python
"""Gap detection logic — compares found documents against compliance matrix."""
|
|
|
|
import uuid
|
|
from .compliance_matrix import COMPLIANCE_MATRIX, RequiredDocument
|
|
|
|
|
|
def generate_gap_analysis(
|
|
classification_counts: dict[str, int],
|
|
company_profiles: list[str] | None = None,
|
|
) -> dict:
|
|
"""Analyze gaps between found documents and required compliance matrix.
|
|
|
|
Args:
|
|
classification_counts: e.g. {"VVT": 2, "TOM": 1, "DSE": 0}
|
|
company_profiles: list of applicable profiles.
|
|
Default: ["universal", "data_processor", "ai_user"]
|
|
|
|
Returns dict with compliance_score, gaps list, classification_breakdown.
|
|
"""
|
|
if company_profiles is None:
|
|
company_profiles = ["universal", "data_processor", "ai_user"]
|
|
|
|
applicable = [
|
|
req for req in COMPLIANCE_MATRIX
|
|
if req.applies_to in company_profiles
|
|
]
|
|
|
|
gaps = []
|
|
covered = 0
|
|
|
|
for req in applicable:
|
|
count = classification_counts.get(req.category, 0)
|
|
if count == 0:
|
|
gaps.append({
|
|
"id": str(uuid.uuid4()),
|
|
"category": req.category,
|
|
"description": req.description,
|
|
"severity": req.severity,
|
|
"regulation": req.regulation,
|
|
"requiredAction": f"{req.category} erstellen und dokumentieren",
|
|
"relatedStepId": None,
|
|
})
|
|
else:
|
|
covered += 1
|
|
|
|
total_required = len(applicable)
|
|
compliance_score = (covered / total_required * 100) if total_required > 0 else 0
|
|
|
|
return {
|
|
"compliance_score": round(compliance_score, 1),
|
|
"total_required": total_required,
|
|
"covered": covered,
|
|
"gaps": gaps,
|
|
"gap_summary": {
|
|
"critical": sum(1 for g in gaps if g["severity"] == "CRITICAL"),
|
|
"high": sum(1 for g in gaps if g["severity"] == "HIGH"),
|
|
"medium": sum(1 for g in gaps if g["severity"] == "MEDIUM"),
|
|
},
|
|
}
|