""" PDF Extractor for BSI-TR-03161 and EU Regulation Documents. This module extracts Pruefaspekte (test aspects) from BSI Technical Guidelines and Articles from EU regulations in PDF format. """ import re import logging from dataclasses import dataclass, field from typing import List, Optional, Dict, Any from pathlib import Path from enum import Enum try: import fitz # PyMuPDF except ImportError: fitz = None logging.warning("PyMuPDF not installed. PDF extraction will not work.") class RequirementLevel(str, Enum): """BSI requirement levels (German: Anforderungsstufen).""" MUSS = "MUSS" # MUST - mandatory SOLL = "SOLL" # SHOULD - recommended KANN = "KANN" # MAY - optional DARF_NICHT = "DARF NICHT" # MUST NOT - prohibited class AspectCategory(str, Enum): """Categories for BSI-TR Pruefaspekte.""" AUTHENTICATION = "authentication" SESSION_MANAGEMENT = "session_management" CRYPTOGRAPHY = "cryptography" INPUT_VALIDATION = "input_validation" SQL_INJECTION = "sql_injection" XSS_PREVENTION = "xss_prevention" CSRF_PROTECTION = "csrf_protection" LOGGING_AUDIT = "logging_audit" ERROR_HANDLING = "error_handling" NETWORK_SECURITY = "network_security" SECURE_STORAGE = "secure_storage" PRIVACY = "privacy" ACCESS_CONTROL = "access_control" DATA_PROTECTION = "data_protection" KEY_MANAGEMENT = "key_management" SECURE_COMMUNICATION = "secure_communication" UPDATE_MECHANISM = "update_mechanism" GENERAL = "general" TEST_ASPECT = "test_aspect" @dataclass class BSIAspect: """A single extracted BSI-TR Pruefaspekt (test aspect).""" aspect_id: str # e.g., "O.Auth_1", "T.Sess_2" title: str # Short title full_text: str # Complete requirement text category: AspectCategory # Categorization page_number: int # PDF page where found section: str # Chapter/section number requirement_level: RequirementLevel # MUSS/SOLL/KANN source_document: str # e.g., "BSI-TR-03161-2" context_before: str = "" # Text before the aspect context_after: str = "" # Text after the aspect related_aspects: List[str] = field(default_factory=list) # Related aspect IDs keywords: List[str] = field(default_factory=list) # Extracted keywords @dataclass class EUArticle: """A single extracted EU regulation article.""" article_number: str # e.g., "Art. 32", "Artikel 5" title: str # Article title full_text: str # Complete article text paragraphs: List[str] # Individual paragraphs page_number: int # PDF page regulation_name: str # e.g., "DSGVO", "AI Act" recitals: List[str] = field(default_factory=list) # Related recitals keywords: List[str] = field(default_factory=list) # Extracted keywords class BSIPDFExtractor: """ Extracts Pruefaspekte from BSI-TR-03161 PDF documents. The BSI-TR-03161 series contains security requirements for mobile applications: - Part 1: General security requirements - Part 2: Web application security (OAuth, Sessions, Input validation, etc.) - Part 3: Backend/server security Each document contains hundreds of Pruefaspekte (test aspects) that need to be extracted, categorized, and stored for compliance tracking. """ # Regex patterns for BSI-TR aspect identification PATTERNS = { # Primary aspect ID patterns (e.g., O.Auth_1, T.Network_2) 'aspect_id': r'(O\.[A-Za-z]+_\d+|T\.[A-Za-z]+_\d+)', # Alternative section-based pattern (e.g., "Pruefaspekt 4.2.1") 'section_aspect': r'(?:Prüfaspekt|Pruefaspekt|Anforderung)\s+(\d+\.\d+(?:\.\d+)?)', # Section number pattern 'section': r'(\d+\.\d+(?:\.\d+)?)', # Requirement level pattern 'requirement': r'\b(MUSS|SOLL|KANN|DARF\s+NICHT|muss|soll|kann|darf\s+nicht)\b', # Table header pattern for Pruefaspekte tables 'table_header': r'(?:Prüfaspekt|Bezeichnung|ID|Anforderung)', } # Category mapping based on aspect ID prefix CATEGORY_MAP = { 'O.Auth': AspectCategory.AUTHENTICATION, 'O.Sess': AspectCategory.SESSION_MANAGEMENT, 'O.Cryp': AspectCategory.CRYPTOGRAPHY, 'O.Crypto': AspectCategory.CRYPTOGRAPHY, 'O.Input': AspectCategory.INPUT_VALIDATION, 'O.SQL': AspectCategory.SQL_INJECTION, 'O.XSS': AspectCategory.XSS_PREVENTION, 'O.CSRF': AspectCategory.CSRF_PROTECTION, 'O.Log': AspectCategory.LOGGING_AUDIT, 'O.Audit': AspectCategory.LOGGING_AUDIT, 'O.Err': AspectCategory.ERROR_HANDLING, 'O.Error': AspectCategory.ERROR_HANDLING, 'O.Net': AspectCategory.NETWORK_SECURITY, 'O.Network': AspectCategory.NETWORK_SECURITY, 'O.Store': AspectCategory.SECURE_STORAGE, 'O.Storage': AspectCategory.SECURE_STORAGE, 'O.Priv': AspectCategory.PRIVACY, 'O.Privacy': AspectCategory.PRIVACY, 'O.Data': AspectCategory.DATA_PROTECTION, 'O.Access': AspectCategory.ACCESS_CONTROL, 'O.Key': AspectCategory.KEY_MANAGEMENT, 'O.Comm': AspectCategory.SECURE_COMMUNICATION, 'O.TLS': AspectCategory.SECURE_COMMUNICATION, 'O.Update': AspectCategory.UPDATE_MECHANISM, 'T.': AspectCategory.TEST_ASPECT, } # Keywords for category detection when aspect ID is ambiguous CATEGORY_KEYWORDS = { AspectCategory.AUTHENTICATION: [ 'authentifizierung', 'authentication', 'login', 'anmeldung', 'passwort', 'password', 'credential', 'oauth', 'oidc', 'token', 'bearer', 'jwt', 'session', 'multi-faktor', 'mfa', '2fa' ], AspectCategory.SESSION_MANAGEMENT: [ 'session', 'sitzung', 'cookie', 'timeout', 'ablauf', 'session-id', 'sessionid', 'logout', 'abmeldung' ], AspectCategory.CRYPTOGRAPHY: [ 'verschlüsselung', 'encryption', 'kryptograph', 'cryptograph', 'aes', 'rsa', 'hash', 'signatur', 'signature', 'zertifikat', 'certificate', 'tls', 'ssl', 'hmac', 'pbkdf', 'argon' ], AspectCategory.INPUT_VALIDATION: [ 'eingabevalidierung', 'input validation', 'validierung', 'eingabeprüfung', 'sanitiz', 'whitelist', 'blacklist', 'filter', 'escape', 'encoding' ], AspectCategory.SQL_INJECTION: [ 'sql injection', 'sql-injection', 'prepared statement', 'parameterisiert', 'parameterized', 'orm', 'database' ], AspectCategory.XSS_PREVENTION: [ 'xss', 'cross-site scripting', 'script injection', 'html encoding', 'output encoding', 'csp', 'content-security' ], AspectCategory.CSRF_PROTECTION: [ 'csrf', 'cross-site request', 'token', 'anti-csrf', 'state parameter', 'same-site', 'samesite' ], AspectCategory.LOGGING_AUDIT: [ 'logging', 'protokollierung', 'audit', 'nachvollziehbar', 'traceability', 'log', 'event', 'monitoring' ], AspectCategory.ERROR_HANDLING: [ 'fehlerbehandlung', 'error handling', 'exception', 'fehlermeldung', 'error message', 'stack trace' ], } def __init__(self, logger: Optional[logging.Logger] = None): """Initialize the PDF extractor.""" self.logger = logger or logging.getLogger(__name__) if fitz is None: raise ImportError( "PyMuPDF is required for PDF extraction. " "Install it with: pip install PyMuPDF" ) def extract_from_file(self, pdf_path: str, source_name: Optional[str] = None) -> List[BSIAspect]: """ Extract all Pruefaspekte from a BSI-TR PDF file. Args: pdf_path: Path to the PDF file source_name: Optional source document name (auto-detected if not provided) Returns: List of extracted BSIAspect objects """ path = Path(pdf_path) if not path.exists(): raise FileNotFoundError(f"PDF file not found: {pdf_path}") source = source_name or path.stem self.logger.info(f"Extracting aspects from: {source}") doc = fitz.open(pdf_path) aspects = [] for page_num in range(len(doc)): page = doc[page_num] text = page.get_text() # Extract aspects from this page page_aspects = self._extract_aspects_from_text( text=text, page_num=page_num + 1, source_document=source ) aspects.extend(page_aspects) doc.close() # Post-process: deduplicate and enrich aspects = self._deduplicate_aspects(aspects) aspects = self._enrich_aspects(aspects) self.logger.info(f"Extracted {len(aspects)} unique aspects from {source}") return aspects def extract_all_documents(self, docs_dir: str) -> Dict[str, List[BSIAspect]]: """ Extract aspects from all BSI-TR PDFs in a directory. Args: docs_dir: Directory containing BSI-TR PDF files Returns: Dictionary mapping document names to their extracted aspects """ docs_path = Path(docs_dir) results = {} # Look for BSI-TR PDFs patterns = ["BSI-TR-03161*.pdf", "bsi-tr-03161*.pdf"] for pattern in patterns: for pdf_file in docs_path.glob(pattern): try: aspects = self.extract_from_file(str(pdf_file)) results[pdf_file.stem] = aspects except Exception as e: self.logger.error(f"Failed to extract from {pdf_file}: {e}") return results def _extract_aspects_from_text( self, text: str, page_num: int, source_document: str ) -> List[BSIAspect]: """Extract all Pruefaspekte from a page's text.""" aspects = [] # Find all aspect IDs on this page for match in re.finditer(self.PATTERNS['aspect_id'], text, re.IGNORECASE): aspect_id = match.group(1).upper() # Extract context around the match start = max(0, match.start() - 200) end = min(len(text), match.end() + 1000) context = text[start:end] # Determine category from aspect ID category = self._determine_category(aspect_id, context) # Extract requirement level req_level = self._extract_requirement_level(context) # Extract title (text immediately after aspect ID) title = self._extract_title(context, aspect_id) # Extract section number section = self._extract_section(context) # Extract full requirement text full_text = self._extract_full_text(context, aspect_id) aspects.append(BSIAspect( aspect_id=aspect_id, title=title, full_text=full_text, category=category, page_number=page_num, section=section, requirement_level=req_level, source_document=source_document, context_before=text[start:match.start()].strip()[-100:], context_after=text[match.end():end].strip()[:200], )) # Also look for section-based aspects for match in re.finditer(self.PATTERNS['section_aspect'], text, re.IGNORECASE): section_id = match.group(1) aspect_id = f"SEC_{section_id.replace('.', '_')}" # Check if we already have this as an O.* aspect if any(a.section == section_id for a in aspects): continue start = max(0, match.start() - 100) end = min(len(text), match.end() + 800) context = text[start:end] category = self._determine_category_from_keywords(context) req_level = self._extract_requirement_level(context) aspects.append(BSIAspect( aspect_id=aspect_id, title=f"Prüfaspekt {section_id}", full_text=context.strip(), category=category, page_number=page_num, section=section_id, requirement_level=req_level, source_document=source_document, )) return aspects def _determine_category(self, aspect_id: str, context: str) -> AspectCategory: """Determine the category of an aspect based on its ID and context.""" # First try to match by aspect ID prefix for prefix, category in self.CATEGORY_MAP.items(): if aspect_id.upper().startswith(prefix.upper()): return category # Fall back to keyword-based detection return self._determine_category_from_keywords(context) def _determine_category_from_keywords(self, text: str) -> AspectCategory: """Determine category based on keywords in the text.""" text_lower = text.lower() category_scores = {} for category, keywords in self.CATEGORY_KEYWORDS.items(): score = sum(1 for kw in keywords if kw in text_lower) if score > 0: category_scores[category] = score if category_scores: return max(category_scores, key=category_scores.get) return AspectCategory.GENERAL def _extract_requirement_level(self, text: str) -> RequirementLevel: """Extract the requirement level from text.""" match = re.search(self.PATTERNS['requirement'], text, re.IGNORECASE) if match: level = match.group(1).upper() if 'DARF' in level and 'NICHT' in level: return RequirementLevel.DARF_NICHT elif level == 'MUSS': return RequirementLevel.MUSS elif level == 'SOLL': return RequirementLevel.SOLL elif level == 'KANN': return RequirementLevel.KANN return RequirementLevel.SOLL # Default def _extract_title(self, context: str, aspect_id: str) -> str: """Extract the title/short description of an aspect.""" # Look for text immediately after the aspect ID pattern = rf'{re.escape(aspect_id)}\s*[:\-–]?\s*([^\n]+)' match = re.search(pattern, context, re.IGNORECASE) if match: title = match.group(1).strip() # Clean up the title title = re.sub(r'\s+', ' ', title) # Truncate if too long if len(title) > 200: title = title[:197] + "..." return title return aspect_id def _extract_section(self, context: str) -> str: """Extract the section number from context.""" match = re.search(self.PATTERNS['section'], context) return match.group(1) if match else "" def _extract_full_text(self, context: str, aspect_id: str) -> str: """Extract the complete requirement text.""" # Find the aspect ID and get text until the next aspect or section pattern = rf'{re.escape(aspect_id)}[^\n]*\n(.*?)(?=\n\s*(?:O\.[A-Z]|T\.[A-Z]|\d+\.\d+\s|\Z))' match = re.search(pattern, context, re.IGNORECASE | re.DOTALL) if match: full_text = match.group(0).strip() else: # Fall back to context full_text = context.strip() # Clean up full_text = re.sub(r'\s+', ' ', full_text) return full_text def _deduplicate_aspects(self, aspects: List[BSIAspect]) -> List[BSIAspect]: """Remove duplicate aspects, keeping the one with more context.""" seen = {} for aspect in aspects: key = aspect.aspect_id if key not in seen: seen[key] = aspect else: # Keep the one with longer full_text if len(aspect.full_text) > len(seen[key].full_text): seen[key] = aspect return list(seen.values()) def _enrich_aspects(self, aspects: List[BSIAspect]) -> List[BSIAspect]: """Enrich aspects with additional metadata.""" aspect_ids = {a.aspect_id for a in aspects} for aspect in aspects: # Find related aspects mentioned in the full text for other_id in aspect_ids: if other_id != aspect.aspect_id and other_id in aspect.full_text: aspect.related_aspects.append(other_id) # Extract keywords based on category aspect.keywords = self._extract_keywords(aspect) return aspects def _extract_keywords(self, aspect: BSIAspect) -> List[str]: """Extract relevant keywords from an aspect.""" keywords = [] text_lower = aspect.full_text.lower() # Add keywords based on category if aspect.category in self.CATEGORY_KEYWORDS: for kw in self.CATEGORY_KEYWORDS[aspect.category]: if kw in text_lower: keywords.append(kw) return list(set(keywords))[:10] # Limit to 10 keywords def get_statistics(self, aspects: List[BSIAspect]) -> Dict[str, Any]: """Get statistics about extracted aspects.""" stats = { "total_aspects": len(aspects), "by_category": {}, "by_requirement_level": {}, "by_source": {}, "unique_sections": set(), } for aspect in aspects: # By category cat = aspect.category.value stats["by_category"][cat] = stats["by_category"].get(cat, 0) + 1 # By requirement level level = aspect.requirement_level.value stats["by_requirement_level"][level] = stats["by_requirement_level"].get(level, 0) + 1 # By source src = aspect.source_document stats["by_source"][src] = stats["by_source"].get(src, 0) + 1 # Unique sections if aspect.section: stats["unique_sections"].add(aspect.section) stats["unique_sections"] = len(stats["unique_sections"]) return stats class EURegulationExtractor: """ Extracts Articles from EU Regulation PDF documents. Handles documents like GDPR, AI Act, CRA, etc. in their official formats. """ PATTERNS = { 'article_de': r'Artikel\s+(\d+)', 'article_en': r'Article\s+(\d+)', 'paragraph': r'\((\d+)\)', 'recital': r'Erwägungsgrund\s+(\d+)|Recital\s+(\d+)', } def __init__(self, logger: Optional[logging.Logger] = None): self.logger = logger or logging.getLogger(__name__) def extract_from_file( self, pdf_path: str, regulation_name: str, language: str = "de" ) -> List[EUArticle]: """Extract all articles from an EU regulation PDF.""" if fitz is None: raise ImportError("PyMuPDF is required for PDF extraction.") path = Path(pdf_path) if not path.exists(): raise FileNotFoundError(f"PDF file not found: {pdf_path}") doc = fitz.open(pdf_path) articles = [] article_pattern = ( self.PATTERNS['article_de'] if language == "de" else self.PATTERNS['article_en'] ) for page_num in range(len(doc)): page = doc[page_num] text = page.get_text() # Find article starts for match in re.finditer(article_pattern, text): article_num = match.group(1) # Extract article content start = match.start() # Find next article or end of page next_match = re.search(article_pattern, text[match.end():]) end = match.end() + next_match.start() if next_match else len(text) article_text = text[start:end].strip() # Extract paragraphs paragraphs = self._extract_paragraphs(article_text) # Extract title title = self._extract_article_title(article_text, article_num) articles.append(EUArticle( article_number=f"Art. {article_num}", title=title, full_text=article_text, paragraphs=paragraphs, page_number=page_num + 1, regulation_name=regulation_name, )) doc.close() return self._deduplicate_articles(articles) def _extract_paragraphs(self, text: str) -> List[str]: """Extract numbered paragraphs from article text.""" paragraphs = [] matches = list(re.finditer(self.PATTERNS['paragraph'], text)) for i, match in enumerate(matches): start = match.start() end = matches[i + 1].start() if i + 1 < len(matches) else len(text) para_text = text[start:end].strip() if para_text: paragraphs.append(para_text) return paragraphs def _extract_article_title(self, text: str, article_num: str) -> str: """Extract the title of an article.""" # Look for title after "Artikel X" pattern = rf'Artikel\s+{article_num}\s*\n\s*([^\n]+)' match = re.search(pattern, text) if match: return match.group(1).strip() return f"Artikel {article_num}" def _deduplicate_articles(self, articles: List[EUArticle]) -> List[EUArticle]: """Remove duplicate articles.""" seen = {} for article in articles: key = article.article_number if key not in seen: seen[key] = article else: if len(article.full_text) > len(seen[key].full_text): seen[key] = article return list(seen.values())