Services: Admin-Compliance, Backend-Compliance, AI-Compliance-SDK, Consent-SDK, Developer-Portal, PCA-Platform, DSMS Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
317 lines
8.6 KiB
Python
317 lines
8.6 KiB
Python
"""
|
|
PII Redactor
|
|
|
|
Redacts Personally Identifiable Information (PII) from logs and responses.
|
|
Essential for DSGVO/GDPR compliance in BreakPilot.
|
|
|
|
Redacted data types:
|
|
- Email addresses
|
|
- IP addresses
|
|
- German phone numbers
|
|
- Names (when identified)
|
|
- Student IDs
|
|
- Credit card numbers
|
|
- IBAN numbers
|
|
|
|
Usage:
|
|
from middleware import PIIRedactor, redact_pii
|
|
|
|
# Use in logging
|
|
logger.info(redact_pii(f"User {email} logged in from {ip}"))
|
|
|
|
# Configure redactor
|
|
redactor = PIIRedactor(patterns=["email", "ip", "phone"])
|
|
safe_message = redactor.redact(sensitive_message)
|
|
"""
|
|
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from typing import Dict, List, Optional, Pattern, Set
|
|
|
|
|
|
@dataclass
|
|
class PIIPattern:
|
|
"""Definition of a PII pattern."""
|
|
name: str
|
|
pattern: Pattern
|
|
replacement: str
|
|
|
|
|
|
# Pre-compiled regex patterns for common PII
|
|
PII_PATTERNS: Dict[str, PIIPattern] = {
|
|
"email": PIIPattern(
|
|
name="email",
|
|
pattern=re.compile(
|
|
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
|
|
re.IGNORECASE
|
|
),
|
|
replacement="[EMAIL_REDACTED]",
|
|
),
|
|
"ip_v4": PIIPattern(
|
|
name="ip_v4",
|
|
pattern=re.compile(
|
|
r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b'
|
|
),
|
|
replacement="[IP_REDACTED]",
|
|
),
|
|
"ip_v6": PIIPattern(
|
|
name="ip_v6",
|
|
pattern=re.compile(
|
|
r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b'
|
|
),
|
|
replacement="[IP_REDACTED]",
|
|
),
|
|
"phone_de": PIIPattern(
|
|
name="phone_de",
|
|
pattern=re.compile(
|
|
r'(?<!\w)(?:\+49|0049|0)[\s.-]?(?:\d{2,4})[\s.-]?(?:\d{3,4})[\s.-]?(?:\d{3,4})(?!\d)'
|
|
),
|
|
replacement="[PHONE_REDACTED]",
|
|
),
|
|
"phone_intl": PIIPattern(
|
|
name="phone_intl",
|
|
pattern=re.compile(
|
|
r'(?<!\w)\+?(?:\d[\s.-]?){10,15}(?!\d)'
|
|
),
|
|
replacement="[PHONE_REDACTED]",
|
|
),
|
|
"credit_card": PIIPattern(
|
|
name="credit_card",
|
|
pattern=re.compile(
|
|
r'\b(?:\d{4}[\s.-]?){3}\d{4}\b'
|
|
),
|
|
replacement="[CC_REDACTED]",
|
|
),
|
|
"iban": PIIPattern(
|
|
name="iban",
|
|
pattern=re.compile(
|
|
r'\b[A-Z]{2}\d{2}[\s]?(?:\d{4}[\s]?){3,5}\d{1,4}\b',
|
|
re.IGNORECASE
|
|
),
|
|
replacement="[IBAN_REDACTED]",
|
|
),
|
|
"student_id": PIIPattern(
|
|
name="student_id",
|
|
pattern=re.compile(
|
|
r'\b(?:student|schueler|schüler)[-_]?(?:id|nr)?[:\s]?\d{4,10}\b',
|
|
re.IGNORECASE
|
|
),
|
|
replacement="[STUDENT_ID_REDACTED]",
|
|
),
|
|
"uuid": PIIPattern(
|
|
name="uuid",
|
|
pattern=re.compile(
|
|
r'\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b',
|
|
re.IGNORECASE
|
|
),
|
|
replacement="[UUID_REDACTED]",
|
|
),
|
|
# German names are harder to detect, but we can catch common patterns
|
|
"name_prefix": PIIPattern(
|
|
name="name_prefix",
|
|
pattern=re.compile(
|
|
r'\b(?:Herr|Frau|Hr\.|Fr\.)\s+[A-ZÄÖÜ][a-zäöüß]+(?:\s+[A-ZÄÖÜ][a-zäöüß]+)?\b'
|
|
),
|
|
replacement="[NAME_REDACTED]",
|
|
),
|
|
}
|
|
|
|
# Default patterns to enable
|
|
DEFAULT_PATTERNS = ["email", "ip_v4", "ip_v6", "phone_de"]
|
|
|
|
|
|
class PIIRedactor:
|
|
"""
|
|
Redacts PII from strings.
|
|
|
|
Attributes:
|
|
patterns: List of pattern names to use (e.g., ["email", "ip_v4"])
|
|
custom_patterns: Additional custom patterns
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
patterns: Optional[List[str]] = None,
|
|
custom_patterns: Optional[List[PIIPattern]] = None,
|
|
preserve_format: bool = False,
|
|
):
|
|
"""
|
|
Initialize the PII redactor.
|
|
|
|
Args:
|
|
patterns: List of pattern names to enable (default: email, ip_v4, ip_v6, phone_de)
|
|
custom_patterns: Additional custom PIIPattern objects
|
|
preserve_format: If True, preserve the length of redacted content
|
|
"""
|
|
self.patterns = patterns or DEFAULT_PATTERNS
|
|
self.custom_patterns = custom_patterns or []
|
|
self.preserve_format = preserve_format
|
|
|
|
# Build active patterns list
|
|
self._active_patterns: List[PIIPattern] = []
|
|
for pattern_name in self.patterns:
|
|
if pattern_name in PII_PATTERNS:
|
|
self._active_patterns.append(PII_PATTERNS[pattern_name])
|
|
|
|
# Add custom patterns
|
|
self._active_patterns.extend(self.custom_patterns)
|
|
|
|
def redact(self, text: str) -> str:
|
|
"""
|
|
Redact PII from the given text.
|
|
|
|
Args:
|
|
text: The text to redact PII from
|
|
|
|
Returns:
|
|
Text with PII replaced by redaction markers
|
|
"""
|
|
if not text:
|
|
return text
|
|
|
|
result = text
|
|
for pattern in self._active_patterns:
|
|
if self.preserve_format:
|
|
# Replace with same-length placeholder
|
|
def replace_preserve(match):
|
|
length = len(match.group())
|
|
return "*" * length
|
|
result = pattern.pattern.sub(replace_preserve, result)
|
|
else:
|
|
result = pattern.pattern.sub(pattern.replacement, result)
|
|
|
|
return result
|
|
|
|
def contains_pii(self, text: str) -> bool:
|
|
"""
|
|
Check if text contains any PII.
|
|
|
|
Args:
|
|
text: The text to check
|
|
|
|
Returns:
|
|
True if PII is detected
|
|
"""
|
|
if not text:
|
|
return False
|
|
|
|
for pattern in self._active_patterns:
|
|
if pattern.pattern.search(text):
|
|
return True
|
|
return False
|
|
|
|
def find_pii(self, text: str) -> List[Dict[str, str]]:
|
|
"""
|
|
Find all PII in text with their types.
|
|
|
|
Args:
|
|
text: The text to search
|
|
|
|
Returns:
|
|
List of dicts with 'type' and 'match' keys
|
|
"""
|
|
if not text:
|
|
return []
|
|
|
|
findings = []
|
|
for pattern in self._active_patterns:
|
|
for match in pattern.pattern.finditer(text):
|
|
findings.append({
|
|
"type": pattern.name,
|
|
"match": match.group(),
|
|
"start": match.start(),
|
|
"end": match.end(),
|
|
})
|
|
|
|
return findings
|
|
|
|
|
|
# Module-level default redactor instance
|
|
_default_redactor: Optional[PIIRedactor] = None
|
|
|
|
|
|
def get_default_redactor() -> PIIRedactor:
|
|
"""Get or create the default redactor instance."""
|
|
global _default_redactor
|
|
if _default_redactor is None:
|
|
_default_redactor = PIIRedactor()
|
|
return _default_redactor
|
|
|
|
|
|
def redact_pii(text: str) -> str:
|
|
"""
|
|
Convenience function to redact PII using the default redactor.
|
|
|
|
Args:
|
|
text: Text to redact
|
|
|
|
Returns:
|
|
Redacted text
|
|
|
|
Example:
|
|
logger.info(redact_pii(f"User {email} logged in"))
|
|
"""
|
|
return get_default_redactor().redact(text)
|
|
|
|
|
|
class PIIRedactingLogFilter:
|
|
"""
|
|
Logging filter that automatically redacts PII from log messages.
|
|
|
|
Usage:
|
|
import logging
|
|
|
|
handler = logging.StreamHandler()
|
|
handler.addFilter(PIIRedactingLogFilter())
|
|
logger = logging.getLogger()
|
|
logger.addHandler(handler)
|
|
"""
|
|
|
|
def __init__(self, redactor: Optional[PIIRedactor] = None):
|
|
self.redactor = redactor or get_default_redactor()
|
|
|
|
def filter(self, record):
|
|
# Redact the message
|
|
if record.msg:
|
|
record.msg = self.redactor.redact(str(record.msg))
|
|
|
|
# Redact args if present
|
|
if record.args:
|
|
if isinstance(record.args, dict):
|
|
record.args = {
|
|
k: self.redactor.redact(str(v)) if isinstance(v, str) else v
|
|
for k, v in record.args.items()
|
|
}
|
|
elif isinstance(record.args, tuple):
|
|
record.args = tuple(
|
|
self.redactor.redact(str(v)) if isinstance(v, str) else v
|
|
for v in record.args
|
|
)
|
|
|
|
return True
|
|
|
|
|
|
def create_safe_dict(data: dict, redactor: Optional[PIIRedactor] = None) -> dict:
|
|
"""
|
|
Create a copy of a dictionary with PII redacted.
|
|
|
|
Args:
|
|
data: Dictionary to redact
|
|
redactor: Optional custom redactor
|
|
|
|
Returns:
|
|
New dictionary with redacted values
|
|
"""
|
|
r = redactor or get_default_redactor()
|
|
|
|
def redact_value(value):
|
|
if isinstance(value, str):
|
|
return r.redact(value)
|
|
elif isinstance(value, dict):
|
|
return create_safe_dict(value, r)
|
|
elif isinstance(value, list):
|
|
return [redact_value(v) for v in value]
|
|
return value
|
|
|
|
return {k: redact_value(v) for k, v in data.items()}
|