""" Compliance Extraction & Generation. Functions for extracting checkpoints from legal text chunks, generating controls, and creating remediation measures. """ import re import hashlib import logging from typing import Dict, List, Optional from .models import Checkpoint, Control, Measure logger = logging.getLogger(__name__) def extract_checkpoints_from_chunk(chunk_text: str, payload: Dict) -> List[Checkpoint]: """ Extract checkpoints/requirements from a chunk of text. Uses pattern matching to find requirement-like statements. """ checkpoints = [] regulation_code = payload.get("regulation_code", "UNKNOWN") regulation_name = payload.get("regulation_name", "Unknown") source_url = payload.get("source_url", "") chunk_id = hashlib.md5(chunk_text[:100].encode()).hexdigest()[:8] # Patterns for different requirement types patterns = [ # BSI-TR patterns (r'([OT]\.[A-Za-z_]+\d*)[:\s]+(.+?)(?=\n[OT]\.|$)', 'bsi_requirement'), # Article patterns (GDPR, AI Act, etc.) (r'(?:Artikel|Art\.?)\s+(\d+)(?:\s+Abs(?:atz)?\.?\s*(\d+))?\s*[-\u2013:]\s*(.+?)(?=\n|$)', 'article'), # Numbered requirements (r'\((\d+)\)\s+(.+?)(?=\n\(\d+\)|$)', 'numbered'), # "Der Verantwortliche muss" patterns (r'(?:Der Verantwortliche|Die Aufsichtsbeh\u00f6rde|Der Auftragsverarbeiter)\s+(muss|hat|soll)\s+(.+?)(?=\.\s|$)', 'obligation'), # "Es ist erforderlich" patterns (r'(?:Es ist erforderlich|Es muss gew\u00e4hrleistet|Es sind geeignete)\s+(.+?)(?=\.\s|$)', 'requirement'), ] for pattern, pattern_type in patterns: matches = re.finditer(pattern, chunk_text, re.MULTILINE | re.DOTALL) for match in matches: if pattern_type == 'bsi_requirement': req_id = match.group(1) description = match.group(2).strip() title = req_id elif pattern_type == 'article': article_num = match.group(1) paragraph = match.group(2) or "" title_text = match.group(3).strip() req_id = f"{regulation_code}-Art{article_num}" if paragraph: req_id += f"-{paragraph}" title = f"Art. {article_num}" + (f" Abs. {paragraph}" if paragraph else "") description = title_text elif pattern_type == 'numbered': num = match.group(1) description = match.group(2).strip() req_id = f"{regulation_code}-{num}" title = f"Anforderung {num}" else: # Generic requirement description = match.group(0).strip() req_id = f"{regulation_code}-{chunk_id}-{len(checkpoints)}" title = description[:50] + "..." if len(description) > 50 else description # Skip very short matches if len(description) < 20: continue checkpoint = Checkpoint( id=req_id, regulation_code=regulation_code, regulation_name=regulation_name, article=title if 'Art' in title else None, title=title, description=description[:500], original_text=description, chunk_id=chunk_id, source_url=source_url ) checkpoints.append(checkpoint) return checkpoints def generate_control_for_checkpoints( checkpoints: List[Checkpoint], domain_counts: Dict[str, int], ) -> Optional[Control]: """ Generate a control that covers the given checkpoints. This is a simplified version - in production this would use the AI assistant. """ if not checkpoints: return None # Group by regulation regulation = checkpoints[0].regulation_code # Determine domain based on content all_text = " ".join([cp.description for cp in checkpoints]).lower() domain = "gov" # Default if any(kw in all_text for kw in ["verschl\u00fcssel", "krypto", "encrypt", "hash"]): domain = "crypto" elif any(kw in all_text for kw in ["zugang", "access", "authentif", "login", "benutzer"]): domain = "iam" elif any(kw in all_text for kw in ["datenschutz", "personenbezogen", "privacy", "einwilligung"]): domain = "priv" elif any(kw in all_text for kw in ["entwicklung", "test", "code", "software"]): domain = "sdlc" elif any(kw in all_text for kw in ["\u00fcberwach", "monitor", "log", "audit"]): domain = "aud" elif any(kw in all_text for kw in ["ki", "k\u00fcnstlich", "ai", "machine learning", "model"]): domain = "ai" elif any(kw in all_text for kw in ["betrieb", "operation", "verf\u00fcgbar", "backup"]): domain = "ops" elif any(kw in all_text for kw in ["cyber", "resilience", "sbom", "vulnerab"]): domain = "cra" # Generate control ID domain_count = domain_counts.get(domain, 0) + 1 control_id = f"{domain.upper()}-{domain_count:03d}" # Create title from first checkpoint title = checkpoints[0].title if len(title) > 100: title = title[:97] + "..." # Create description description = f"Control f\u00fcr {regulation}: " + checkpoints[0].description[:200] # Pass criteria pass_criteria = f"Alle {len(checkpoints)} zugeh\u00f6rigen Anforderungen sind erf\u00fcllt und dokumentiert." # Implementation guidance guidance = f"Implementiere Ma\u00dfnahmen zur Erf\u00fcllung der Anforderungen aus {regulation}. " guidance += f"Dokumentiere die Umsetzung und f\u00fchre regelm\u00e4\u00dfige Reviews durch." # Determine if automated is_automated = any(kw in all_text for kw in ["automat", "tool", "scan", "test"]) control = Control( id=control_id, domain=domain, title=title, description=description, checkpoints=[cp.id for cp in checkpoints], pass_criteria=pass_criteria, implementation_guidance=guidance, is_automated=is_automated, automation_tool="CI/CD Pipeline" if is_automated else None, priority="high" if "muss" in all_text or "erforderlich" in all_text else "medium" ) return control def generate_measure_for_control(control: Control) -> Measure: """Generate a remediation measure for a control.""" measure_id = f"M-{control.id}" # Determine deadline based on priority deadline_days = { "critical": 30, "high": 60, "medium": 90, "low": 180 }.get(control.priority, 90) # Determine responsible team responsible = { "priv": "Datenschutzbeauftragter", "iam": "IT-Security Team", "sdlc": "Entwicklungsteam", "crypto": "IT-Security Team", "ops": "Operations Team", "aud": "Compliance Team", "ai": "AI/ML Team", "cra": "IT-Security Team", "gov": "Management" }.get(control.domain, "Compliance Team") measure = Measure( id=measure_id, control_id=control.id, title=f"Umsetzung: {control.title[:50]}", description=f"Implementierung und Dokumentation von {control.id}: {control.description[:100]}", responsible=responsible, deadline_days=deadline_days, status="pending" ) return measure