Files
breakpilot-compliance/backend-compliance/compliance/services/pdf_extractor.py
Benjamin Boenisch 4435e7ea0a Initial commit: breakpilot-compliance - Compliance SDK Platform
Services: Admin-Compliance, Backend-Compliance,
AI-Compliance-SDK, Consent-SDK, Developer-Portal,
PCA-Platform, DSMS

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 23:47:28 +01:00

603 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
PDF Extractor for BSI-TR-03161 and EU Regulation Documents.
This module extracts Pruefaspekte (test aspects) from BSI Technical Guidelines
and Articles from EU regulations in PDF format.
"""
import re
import logging
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from pathlib import Path
from enum import Enum
try:
import fitz # PyMuPDF
except ImportError:
fitz = None
logging.warning("PyMuPDF not installed. PDF extraction will not work.")
class RequirementLevel(str, Enum):
"""BSI requirement levels (German: Anforderungsstufen)."""
MUSS = "MUSS" # MUST - mandatory
SOLL = "SOLL" # SHOULD - recommended
KANN = "KANN" # MAY - optional
DARF_NICHT = "DARF NICHT" # MUST NOT - prohibited
class AspectCategory(str, Enum):
"""Categories for BSI-TR Pruefaspekte."""
AUTHENTICATION = "authentication"
SESSION_MANAGEMENT = "session_management"
CRYPTOGRAPHY = "cryptography"
INPUT_VALIDATION = "input_validation"
SQL_INJECTION = "sql_injection"
XSS_PREVENTION = "xss_prevention"
CSRF_PROTECTION = "csrf_protection"
LOGGING_AUDIT = "logging_audit"
ERROR_HANDLING = "error_handling"
NETWORK_SECURITY = "network_security"
SECURE_STORAGE = "secure_storage"
PRIVACY = "privacy"
ACCESS_CONTROL = "access_control"
DATA_PROTECTION = "data_protection"
KEY_MANAGEMENT = "key_management"
SECURE_COMMUNICATION = "secure_communication"
UPDATE_MECHANISM = "update_mechanism"
GENERAL = "general"
TEST_ASPECT = "test_aspect"
@dataclass
class BSIAspect:
"""A single extracted BSI-TR Pruefaspekt (test aspect)."""
aspect_id: str # e.g., "O.Auth_1", "T.Sess_2"
title: str # Short title
full_text: str # Complete requirement text
category: AspectCategory # Categorization
page_number: int # PDF page where found
section: str # Chapter/section number
requirement_level: RequirementLevel # MUSS/SOLL/KANN
source_document: str # e.g., "BSI-TR-03161-2"
context_before: str = "" # Text before the aspect
context_after: str = "" # Text after the aspect
related_aspects: List[str] = field(default_factory=list) # Related aspect IDs
keywords: List[str] = field(default_factory=list) # Extracted keywords
@dataclass
class EUArticle:
"""A single extracted EU regulation article."""
article_number: str # e.g., "Art. 32", "Artikel 5"
title: str # Article title
full_text: str # Complete article text
paragraphs: List[str] # Individual paragraphs
page_number: int # PDF page
regulation_name: str # e.g., "DSGVO", "AI Act"
recitals: List[str] = field(default_factory=list) # Related recitals
keywords: List[str] = field(default_factory=list) # Extracted keywords
class BSIPDFExtractor:
"""
Extracts Pruefaspekte from BSI-TR-03161 PDF documents.
The BSI-TR-03161 series contains security requirements for mobile applications:
- Part 1: General security requirements
- Part 2: Web application security (OAuth, Sessions, Input validation, etc.)
- Part 3: Backend/server security
Each document contains hundreds of Pruefaspekte (test aspects) that need to
be extracted, categorized, and stored for compliance tracking.
"""
# Regex patterns for BSI-TR aspect identification
PATTERNS = {
# Primary aspect ID patterns (e.g., O.Auth_1, T.Network_2)
'aspect_id': r'(O\.[A-Za-z]+_\d+|T\.[A-Za-z]+_\d+)',
# Alternative section-based pattern (e.g., "Pruefaspekt 4.2.1")
'section_aspect': r'(?:Prüfaspekt|Pruefaspekt|Anforderung)\s+(\d+\.\d+(?:\.\d+)?)',
# Section number pattern
'section': r'(\d+\.\d+(?:\.\d+)?)',
# Requirement level pattern
'requirement': r'\b(MUSS|SOLL|KANN|DARF\s+NICHT|muss|soll|kann|darf\s+nicht)\b',
# Table header pattern for Pruefaspekte tables
'table_header': r'(?:Prüfaspekt|Bezeichnung|ID|Anforderung)',
}
# Category mapping based on aspect ID prefix
CATEGORY_MAP = {
'O.Auth': AspectCategory.AUTHENTICATION,
'O.Sess': AspectCategory.SESSION_MANAGEMENT,
'O.Cryp': AspectCategory.CRYPTOGRAPHY,
'O.Crypto': AspectCategory.CRYPTOGRAPHY,
'O.Input': AspectCategory.INPUT_VALIDATION,
'O.SQL': AspectCategory.SQL_INJECTION,
'O.XSS': AspectCategory.XSS_PREVENTION,
'O.CSRF': AspectCategory.CSRF_PROTECTION,
'O.Log': AspectCategory.LOGGING_AUDIT,
'O.Audit': AspectCategory.LOGGING_AUDIT,
'O.Err': AspectCategory.ERROR_HANDLING,
'O.Error': AspectCategory.ERROR_HANDLING,
'O.Net': AspectCategory.NETWORK_SECURITY,
'O.Network': AspectCategory.NETWORK_SECURITY,
'O.Store': AspectCategory.SECURE_STORAGE,
'O.Storage': AspectCategory.SECURE_STORAGE,
'O.Priv': AspectCategory.PRIVACY,
'O.Privacy': AspectCategory.PRIVACY,
'O.Data': AspectCategory.DATA_PROTECTION,
'O.Access': AspectCategory.ACCESS_CONTROL,
'O.Key': AspectCategory.KEY_MANAGEMENT,
'O.Comm': AspectCategory.SECURE_COMMUNICATION,
'O.TLS': AspectCategory.SECURE_COMMUNICATION,
'O.Update': AspectCategory.UPDATE_MECHANISM,
'T.': AspectCategory.TEST_ASPECT,
}
# Keywords for category detection when aspect ID is ambiguous
CATEGORY_KEYWORDS = {
AspectCategory.AUTHENTICATION: [
'authentifizierung', 'authentication', 'login', 'anmeldung',
'passwort', 'password', 'credential', 'oauth', 'oidc', 'token',
'bearer', 'jwt', 'session', 'multi-faktor', 'mfa', '2fa'
],
AspectCategory.SESSION_MANAGEMENT: [
'session', 'sitzung', 'cookie', 'timeout', 'ablauf',
'session-id', 'sessionid', 'logout', 'abmeldung'
],
AspectCategory.CRYPTOGRAPHY: [
'verschlüsselung', 'encryption', 'kryptograph', 'cryptograph',
'aes', 'rsa', 'hash', 'signatur', 'signature', 'zertifikat',
'certificate', 'tls', 'ssl', 'hmac', 'pbkdf', 'argon'
],
AspectCategory.INPUT_VALIDATION: [
'eingabevalidierung', 'input validation', 'validierung',
'eingabeprüfung', 'sanitiz', 'whitelist', 'blacklist',
'filter', 'escape', 'encoding'
],
AspectCategory.SQL_INJECTION: [
'sql injection', 'sql-injection', 'prepared statement',
'parameterisiert', 'parameterized', 'orm', 'database'
],
AspectCategory.XSS_PREVENTION: [
'xss', 'cross-site scripting', 'script injection',
'html encoding', 'output encoding', 'csp', 'content-security'
],
AspectCategory.CSRF_PROTECTION: [
'csrf', 'cross-site request', 'token', 'anti-csrf',
'state parameter', 'same-site', 'samesite'
],
AspectCategory.LOGGING_AUDIT: [
'logging', 'protokollierung', 'audit', 'nachvollziehbar',
'traceability', 'log', 'event', 'monitoring'
],
AspectCategory.ERROR_HANDLING: [
'fehlerbehandlung', 'error handling', 'exception',
'fehlermeldung', 'error message', 'stack trace'
],
}
def __init__(self, logger: Optional[logging.Logger] = None):
"""Initialize the PDF extractor."""
self.logger = logger or logging.getLogger(__name__)
if fitz is None:
raise ImportError(
"PyMuPDF is required for PDF extraction. "
"Install it with: pip install PyMuPDF"
)
def extract_from_file(self, pdf_path: str, source_name: Optional[str] = None) -> List[BSIAspect]:
"""
Extract all Pruefaspekte from a BSI-TR PDF file.
Args:
pdf_path: Path to the PDF file
source_name: Optional source document name (auto-detected if not provided)
Returns:
List of extracted BSIAspect objects
"""
path = Path(pdf_path)
if not path.exists():
raise FileNotFoundError(f"PDF file not found: {pdf_path}")
source = source_name or path.stem
self.logger.info(f"Extracting aspects from: {source}")
doc = fitz.open(pdf_path)
aspects = []
for page_num in range(len(doc)):
page = doc[page_num]
text = page.get_text()
# Extract aspects from this page
page_aspects = self._extract_aspects_from_text(
text=text,
page_num=page_num + 1,
source_document=source
)
aspects.extend(page_aspects)
doc.close()
# Post-process: deduplicate and enrich
aspects = self._deduplicate_aspects(aspects)
aspects = self._enrich_aspects(aspects)
self.logger.info(f"Extracted {len(aspects)} unique aspects from {source}")
return aspects
def extract_all_documents(self, docs_dir: str) -> Dict[str, List[BSIAspect]]:
"""
Extract aspects from all BSI-TR PDFs in a directory.
Args:
docs_dir: Directory containing BSI-TR PDF files
Returns:
Dictionary mapping document names to their extracted aspects
"""
docs_path = Path(docs_dir)
results = {}
# Look for BSI-TR PDFs
patterns = ["BSI-TR-03161*.pdf", "bsi-tr-03161*.pdf"]
for pattern in patterns:
for pdf_file in docs_path.glob(pattern):
try:
aspects = self.extract_from_file(str(pdf_file))
results[pdf_file.stem] = aspects
except Exception as e:
self.logger.error(f"Failed to extract from {pdf_file}: {e}")
return results
def _extract_aspects_from_text(
self,
text: str,
page_num: int,
source_document: str
) -> List[BSIAspect]:
"""Extract all Pruefaspekte from a page's text."""
aspects = []
# Find all aspect IDs on this page
for match in re.finditer(self.PATTERNS['aspect_id'], text, re.IGNORECASE):
aspect_id = match.group(1).upper()
# Extract context around the match
start = max(0, match.start() - 200)
end = min(len(text), match.end() + 1000)
context = text[start:end]
# Determine category from aspect ID
category = self._determine_category(aspect_id, context)
# Extract requirement level
req_level = self._extract_requirement_level(context)
# Extract title (text immediately after aspect ID)
title = self._extract_title(context, aspect_id)
# Extract section number
section = self._extract_section(context)
# Extract full requirement text
full_text = self._extract_full_text(context, aspect_id)
aspects.append(BSIAspect(
aspect_id=aspect_id,
title=title,
full_text=full_text,
category=category,
page_number=page_num,
section=section,
requirement_level=req_level,
source_document=source_document,
context_before=text[start:match.start()].strip()[-100:],
context_after=text[match.end():end].strip()[:200],
))
# Also look for section-based aspects
for match in re.finditer(self.PATTERNS['section_aspect'], text, re.IGNORECASE):
section_id = match.group(1)
aspect_id = f"SEC_{section_id.replace('.', '_')}"
# Check if we already have this as an O.* aspect
if any(a.section == section_id for a in aspects):
continue
start = max(0, match.start() - 100)
end = min(len(text), match.end() + 800)
context = text[start:end]
category = self._determine_category_from_keywords(context)
req_level = self._extract_requirement_level(context)
aspects.append(BSIAspect(
aspect_id=aspect_id,
title=f"Prüfaspekt {section_id}",
full_text=context.strip(),
category=category,
page_number=page_num,
section=section_id,
requirement_level=req_level,
source_document=source_document,
))
return aspects
def _determine_category(self, aspect_id: str, context: str) -> AspectCategory:
"""Determine the category of an aspect based on its ID and context."""
# First try to match by aspect ID prefix
for prefix, category in self.CATEGORY_MAP.items():
if aspect_id.upper().startswith(prefix.upper()):
return category
# Fall back to keyword-based detection
return self._determine_category_from_keywords(context)
def _determine_category_from_keywords(self, text: str) -> AspectCategory:
"""Determine category based on keywords in the text."""
text_lower = text.lower()
category_scores = {}
for category, keywords in self.CATEGORY_KEYWORDS.items():
score = sum(1 for kw in keywords if kw in text_lower)
if score > 0:
category_scores[category] = score
if category_scores:
return max(category_scores, key=category_scores.get)
return AspectCategory.GENERAL
def _extract_requirement_level(self, text: str) -> RequirementLevel:
"""Extract the requirement level from text."""
match = re.search(self.PATTERNS['requirement'], text, re.IGNORECASE)
if match:
level = match.group(1).upper()
if 'DARF' in level and 'NICHT' in level:
return RequirementLevel.DARF_NICHT
elif level == 'MUSS':
return RequirementLevel.MUSS
elif level == 'SOLL':
return RequirementLevel.SOLL
elif level == 'KANN':
return RequirementLevel.KANN
return RequirementLevel.SOLL # Default
def _extract_title(self, context: str, aspect_id: str) -> str:
"""Extract the title/short description of an aspect."""
# Look for text immediately after the aspect ID
pattern = rf'{re.escape(aspect_id)}\s*[:\-]?\s*([^\n]+)'
match = re.search(pattern, context, re.IGNORECASE)
if match:
title = match.group(1).strip()
# Clean up the title
title = re.sub(r'\s+', ' ', title)
# Truncate if too long
if len(title) > 200:
title = title[:197] + "..."
return title
return aspect_id
def _extract_section(self, context: str) -> str:
"""Extract the section number from context."""
match = re.search(self.PATTERNS['section'], context)
return match.group(1) if match else ""
def _extract_full_text(self, context: str, aspect_id: str) -> str:
"""Extract the complete requirement text."""
# Find the aspect ID and get text until the next aspect or section
pattern = rf'{re.escape(aspect_id)}[^\n]*\n(.*?)(?=\n\s*(?:O\.[A-Z]|T\.[A-Z]|\d+\.\d+\s|\Z))'
match = re.search(pattern, context, re.IGNORECASE | re.DOTALL)
if match:
full_text = match.group(0).strip()
else:
# Fall back to context
full_text = context.strip()
# Clean up
full_text = re.sub(r'\s+', ' ', full_text)
return full_text
def _deduplicate_aspects(self, aspects: List[BSIAspect]) -> List[BSIAspect]:
"""Remove duplicate aspects, keeping the one with more context."""
seen = {}
for aspect in aspects:
key = aspect.aspect_id
if key not in seen:
seen[key] = aspect
else:
# Keep the one with longer full_text
if len(aspect.full_text) > len(seen[key].full_text):
seen[key] = aspect
return list(seen.values())
def _enrich_aspects(self, aspects: List[BSIAspect]) -> List[BSIAspect]:
"""Enrich aspects with additional metadata."""
aspect_ids = {a.aspect_id for a in aspects}
for aspect in aspects:
# Find related aspects mentioned in the full text
for other_id in aspect_ids:
if other_id != aspect.aspect_id and other_id in aspect.full_text:
aspect.related_aspects.append(other_id)
# Extract keywords based on category
aspect.keywords = self._extract_keywords(aspect)
return aspects
def _extract_keywords(self, aspect: BSIAspect) -> List[str]:
"""Extract relevant keywords from an aspect."""
keywords = []
text_lower = aspect.full_text.lower()
# Add keywords based on category
if aspect.category in self.CATEGORY_KEYWORDS:
for kw in self.CATEGORY_KEYWORDS[aspect.category]:
if kw in text_lower:
keywords.append(kw)
return list(set(keywords))[:10] # Limit to 10 keywords
def get_statistics(self, aspects: List[BSIAspect]) -> Dict[str, Any]:
"""Get statistics about extracted aspects."""
stats = {
"total_aspects": len(aspects),
"by_category": {},
"by_requirement_level": {},
"by_source": {},
"unique_sections": set(),
}
for aspect in aspects:
# By category
cat = aspect.category.value
stats["by_category"][cat] = stats["by_category"].get(cat, 0) + 1
# By requirement level
level = aspect.requirement_level.value
stats["by_requirement_level"][level] = stats["by_requirement_level"].get(level, 0) + 1
# By source
src = aspect.source_document
stats["by_source"][src] = stats["by_source"].get(src, 0) + 1
# Unique sections
if aspect.section:
stats["unique_sections"].add(aspect.section)
stats["unique_sections"] = len(stats["unique_sections"])
return stats
class EURegulationExtractor:
"""
Extracts Articles from EU Regulation PDF documents.
Handles documents like GDPR, AI Act, CRA, etc. in their official formats.
"""
PATTERNS = {
'article_de': r'Artikel\s+(\d+)',
'article_en': r'Article\s+(\d+)',
'paragraph': r'\((\d+)\)',
'recital': r'Erwägungsgrund\s+(\d+)|Recital\s+(\d+)',
}
def __init__(self, logger: Optional[logging.Logger] = None):
self.logger = logger or logging.getLogger(__name__)
def extract_from_file(
self,
pdf_path: str,
regulation_name: str,
language: str = "de"
) -> List[EUArticle]:
"""Extract all articles from an EU regulation PDF."""
if fitz is None:
raise ImportError("PyMuPDF is required for PDF extraction.")
path = Path(pdf_path)
if not path.exists():
raise FileNotFoundError(f"PDF file not found: {pdf_path}")
doc = fitz.open(pdf_path)
articles = []
article_pattern = (
self.PATTERNS['article_de'] if language == "de"
else self.PATTERNS['article_en']
)
for page_num in range(len(doc)):
page = doc[page_num]
text = page.get_text()
# Find article starts
for match in re.finditer(article_pattern, text):
article_num = match.group(1)
# Extract article content
start = match.start()
# Find next article or end of page
next_match = re.search(article_pattern, text[match.end():])
end = match.end() + next_match.start() if next_match else len(text)
article_text = text[start:end].strip()
# Extract paragraphs
paragraphs = self._extract_paragraphs(article_text)
# Extract title
title = self._extract_article_title(article_text, article_num)
articles.append(EUArticle(
article_number=f"Art. {article_num}",
title=title,
full_text=article_text,
paragraphs=paragraphs,
page_number=page_num + 1,
regulation_name=regulation_name,
))
doc.close()
return self._deduplicate_articles(articles)
def _extract_paragraphs(self, text: str) -> List[str]:
"""Extract numbered paragraphs from article text."""
paragraphs = []
matches = list(re.finditer(self.PATTERNS['paragraph'], text))
for i, match in enumerate(matches):
start = match.start()
end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
para_text = text[start:end].strip()
if para_text:
paragraphs.append(para_text)
return paragraphs
def _extract_article_title(self, text: str, article_num: str) -> str:
"""Extract the title of an article."""
# Look for title after "Artikel X"
pattern = rf'Artikel\s+{article_num}\s*\n\s*([^\n]+)'
match = re.search(pattern, text)
if match:
return match.group(1).strip()
return f"Artikel {article_num}"
def _deduplicate_articles(self, articles: List[EUArticle]) -> List[EUArticle]:
"""Remove duplicate articles."""
seen = {}
for article in articles:
key = article.article_number
if key not in seen:
seen[key] = article
else:
if len(article.full_text) > len(seen[key].full_text):
seen[key] = article
return list(seen.values())