""" Compliance Extraction & Generation. Functions for extracting checkpoints from legal text chunks, generating controls, and creating remediation measures. """ import re import hashlib import logging from typing import Dict, List, Optional from compliance_models import Checkpoint, Control, Measure logger = logging.getLogger(__name__) def extract_checkpoints_from_chunk(chunk_text: str, payload: Dict) -> List[Checkpoint]: """ Extract checkpoints/requirements from a chunk of text. Uses pattern matching to find requirement-like statements. """ checkpoints = [] regulation_code = payload.get("regulation_code", "UNKNOWN") regulation_name = payload.get("regulation_name", "Unknown") source_url = payload.get("source_url", "") chunk_id = hashlib.md5(chunk_text[:100].encode()).hexdigest()[:8] # Patterns for different requirement types patterns = [ # BSI-TR patterns (r'([OT]\.[A-Za-z_]+\d*)[:\s]+(.+?)(?=\n[OT]\.|$)', 'bsi_requirement'), # Article patterns (GDPR, AI Act, etc.) (r'(?:Artikel|Art\.?)\s+(\d+)(?:\s+Abs(?:atz)?\.?\s*(\d+))?\s*[-\u2013:]\s*(.+?)(?=\n|$)', 'article'), # Numbered requirements (r'\((\d+)\)\s+(.+?)(?=\n\(\d+\)|$)', 'numbered'), # "Der Verantwortliche muss" patterns (r'(?:Der Verantwortliche|Die Aufsichtsbeh\u00f6rde|Der Auftragsverarbeiter)\s+(muss|hat|soll)\s+(.+?)(?=\.\s|$)', 'obligation'), # "Es ist erforderlich" patterns (r'(?:Es ist erforderlich|Es muss gew\u00e4hrleistet|Es sind geeignete)\s+(.+?)(?=\.\s|$)', 'requirement'), ] for pattern, pattern_type in patterns: matches = re.finditer(pattern, chunk_text, re.MULTILINE | re.DOTALL) for match in matches: if pattern_type == 'bsi_requirement': req_id = match.group(1) description = match.group(2).strip() title = req_id elif pattern_type == 'article': article_num = match.group(1) paragraph = match.group(2) or "" title_text = match.group(3).strip() req_id = f"{regulation_code}-Art{article_num}" if paragraph: req_id += f"-{paragraph}" title = f"Art. {article_num}" + (f" Abs. {paragraph}" if paragraph else "") description = title_text elif pattern_type == 'numbered': num = match.group(1) description = match.group(2).strip() req_id = f"{regulation_code}-{num}" title = f"Anforderung {num}" else: # Generic requirement description = match.group(0).strip() req_id = f"{regulation_code}-{chunk_id}-{len(checkpoints)}" title = description[:50] + "..." if len(description) > 50 else description # Skip very short matches if len(description) < 20: continue checkpoint = Checkpoint( id=req_id, regulation_code=regulation_code, regulation_name=regulation_name, article=title if 'Art' in title else None, title=title, description=description[:500], original_text=description, chunk_id=chunk_id, source_url=source_url ) checkpoints.append(checkpoint) return checkpoints def generate_control_for_checkpoints( checkpoints: List[Checkpoint], domain_counts: Dict[str, int], ) -> Optional[Control]: """ Generate a control that covers the given checkpoints. This is a simplified version - in production this would use the AI assistant. """ if not checkpoints: return None # Group by regulation regulation = checkpoints[0].regulation_code # Determine domain based on content all_text = " ".join([cp.description for cp in checkpoints]).lower() domain = "gov" # Default if any(kw in all_text for kw in ["verschl\u00fcssel", "krypto", "encrypt", "hash"]): domain = "crypto" elif any(kw in all_text for kw in ["zugang", "access", "authentif", "login", "benutzer"]): domain = "iam" elif any(kw in all_text for kw in ["datenschutz", "personenbezogen", "privacy", "einwilligung"]): domain = "priv" elif any(kw in all_text for kw in ["entwicklung", "test", "code", "software"]): domain = "sdlc" elif any(kw in all_text for kw in ["\u00fcberwach", "monitor", "log", "audit"]): domain = "aud" elif any(kw in all_text for kw in ["ki", "k\u00fcnstlich", "ai", "machine learning", "model"]): domain = "ai" elif any(kw in all_text for kw in ["betrieb", "operation", "verf\u00fcgbar", "backup"]): domain = "ops" elif any(kw in all_text for kw in ["cyber", "resilience", "sbom", "vulnerab"]): domain = "cra" # Generate control ID domain_count = domain_counts.get(domain, 0) + 1 control_id = f"{domain.upper()}-{domain_count:03d}" # Create title from first checkpoint title = checkpoints[0].title if len(title) > 100: title = title[:97] + "..." # Create description description = f"Control f\u00fcr {regulation}: " + checkpoints[0].description[:200] # Pass criteria pass_criteria = f"Alle {len(checkpoints)} zugeh\u00f6rigen Anforderungen sind erf\u00fcllt und dokumentiert." # Implementation guidance guidance = f"Implementiere Ma\u00dfnahmen zur Erf\u00fcllung der Anforderungen aus {regulation}. " guidance += f"Dokumentiere die Umsetzung und f\u00fchre regelm\u00e4\u00dfige Reviews durch." # Determine if automated is_automated = any(kw in all_text for kw in ["automat", "tool", "scan", "test"]) control = Control( id=control_id, domain=domain, title=title, description=description, checkpoints=[cp.id for cp in checkpoints], pass_criteria=pass_criteria, implementation_guidance=guidance, is_automated=is_automated, automation_tool="CI/CD Pipeline" if is_automated else None, priority="high" if "muss" in all_text or "erforderlich" in all_text else "medium" ) return control def generate_measure_for_control(control: Control) -> Measure: """Generate a remediation measure for a control.""" measure_id = f"M-{control.id}" # Determine deadline based on priority deadline_days = { "critical": 30, "high": 60, "medium": 90, "low": 180 }.get(control.priority, 90) # Determine responsible team responsible = { "priv": "Datenschutzbeauftragter", "iam": "IT-Security Team", "sdlc": "Entwicklungsteam", "crypto": "IT-Security Team", "ops": "Operations Team", "aud": "Compliance Team", "ai": "AI/ML Team", "cra": "IT-Security Team", "gov": "Management" }.get(control.domain, "Compliance Team") measure = Measure( id=measure_id, control_id=control.id, title=f"Umsetzung: {control.title[:50]}", description=f"Implementierung und Dokumentation von {control.id}: {control.description[:100]}", responsible=responsible, deadline_days=deadline_days, status="pending" ) return measure