33bf2b7c5a
Build + Deploy / build-admin-compliance (push) Successful in 2m5s
Build + Deploy / build-ai-sdk (push) Successful in 56s
Build + Deploy / build-developer-portal (push) Successful in 1m29s
Build + Deploy / build-backend-compliance (push) Successful in 3m26s
Build + Deploy / build-tts (push) Failing after 1m48s
Build + Deploy / build-document-crawler (push) Successful in 44s
Build + Deploy / build-dsms-gateway (push) Successful in 28s
Build + Deploy / build-dsms-node (push) Successful in 17s
CI / branch-name (push) Has been skipped
Build + Deploy / trigger-orca (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / loc-budget (push) Failing after 17s
CI / secret-scan (push) Has been skipped
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 2m45s
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / test-go (push) Failing after 52s
CI / test-python-backend (push) Successful in 36s
CI / test-python-document-crawler (push) Successful in 25s
CI / test-python-dsms-gateway (push) Successful in 21s
CI / validate-canonical-controls (push) Successful in 14s
New service_detector.py uses service_registry (88 entries) plus 30+ extra text patterns to detect services mentioned in DSI/legal texts. Results on Spiegel: 31/32 services detected (97%, was 5/32 = 16%). Includes metadata: name, category, country, EU adequacy status. - Profiler now uses detect_services_in_text() instead of 20-entry list - Profile extractor adds detected_services with full metadata - Auto-generates scope hint for non-EU services (Drittlandtransfer) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
265 lines
9.4 KiB
Python
265 lines
9.4 KiB
Python
"""
|
|
Profile Extractor — pre-fill Company Profile + Compliance Scope from documents.
|
|
|
|
When a customer uploads their existing legal documents, we extract
|
|
what we can and pre-fill the profile/scope wizard so they only need
|
|
to confirm and fill gaps.
|
|
|
|
Returns a dict that maps to CompanyProfile and ScopeProfilingAnswer fields.
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def extract_profile_from_documents(
|
|
doc_texts: dict[str, str],
|
|
business_profile: dict | None = None,
|
|
) -> dict:
|
|
"""Extract Company Profile fields from document texts.
|
|
|
|
Args:
|
|
doc_texts: dict mapping doc_type -> text
|
|
business_profile: optional detected business profile from profiler
|
|
|
|
Returns dict with pre-filled fields for Company Profile and Scope.
|
|
"""
|
|
result: dict = {
|
|
"company_profile": {},
|
|
"compliance_scope_hints": [],
|
|
"extracted_from": [],
|
|
}
|
|
|
|
all_text = "\n".join(doc_texts.values()).lower()
|
|
all_text_original = "\n".join(doc_texts.values())
|
|
|
|
# ── Company name + legal form ────────────────────────────────
|
|
impressum = doc_texts.get("impressum", "")
|
|
if impressum:
|
|
_extract_company_info(impressum, result)
|
|
result["extracted_from"].append("impressum")
|
|
|
|
# Fallback: try DSI
|
|
if not result["company_profile"].get("companyName") and "dse" in doc_texts:
|
|
_extract_company_info(doc_texts["dse"], result)
|
|
result["extracted_from"].append("dse")
|
|
|
|
# ── DPO contact ──────────────────────────────────────────────
|
|
_extract_dpo(all_text_original, result)
|
|
|
|
# ── Business model from profiler ─────────────────────────────
|
|
if business_profile:
|
|
bp = business_profile
|
|
if bp.get("business_type") and bp["business_type"] != "unknown":
|
|
result["company_profile"]["businessModel"] = bp["business_type"]
|
|
if bp.get("industry") and bp["industry"] != "unknown":
|
|
result["company_profile"]["industry"] = [bp["industry"]]
|
|
if bp.get("has_online_shop"):
|
|
result["company_profile"]["offerings"] = ["online_shop"]
|
|
if bp.get("is_regulated_profession"):
|
|
result["company_profile"]["regulatedProfession"] = True
|
|
result["company_profile"]["regulatedProfessionType"] = bp.get(
|
|
"regulated_profession_type", ""
|
|
)
|
|
|
|
# ── Detected services (full list with metadata) ────────────
|
|
try:
|
|
from compliance.services.service_detector import detect_services_in_text
|
|
detected = detect_services_in_text(all_text)
|
|
result["detected_services"] = detected
|
|
# Add non-EU services as scope hint
|
|
non_eu = [s for s in detected if not s.get("eu_adequate")]
|
|
if non_eu:
|
|
result["compliance_scope_hints"].append({
|
|
"field": "hasThirdCountryTransfer",
|
|
"value": True,
|
|
"source": f"{len(non_eu)} Dienste ausserhalb EWR erkannt ({', '.join(s['name'] for s in non_eu[:5])}...)",
|
|
})
|
|
except Exception as e:
|
|
logger.warning("Service detection failed: %s", e)
|
|
|
|
# ── Scope hints from document content ────────────────────────
|
|
_extract_scope_hints(all_text, result)
|
|
|
|
# ── Tracking services → data processing activities ───────────
|
|
if business_profile and business_profile.get("detected_services"):
|
|
result["detected_services"] = business_profile["detected_services"]
|
|
|
|
logger.info(
|
|
"Extracted %d profile fields, %d scope hints from %d documents",
|
|
len(result["company_profile"]),
|
|
len(result["compliance_scope_hints"]),
|
|
len(doc_texts),
|
|
)
|
|
return result
|
|
|
|
|
|
def _extract_company_info(text: str, result: dict) -> None:
|
|
"""Extract company name, legal form, address from text."""
|
|
cp = result["company_profile"]
|
|
|
|
# GmbH / AG / UG / e.K. etc.
|
|
legal_forms = {
|
|
r"(\S+(?:\s+\S+){0,4})\s+gmbh\b": ("GmbH", "gmbh"),
|
|
r"(\S+(?:\s+\S+){0,4})\s+ag\b": ("AG", "ag"),
|
|
r"(\S+(?:\s+\S+){0,4})\s+ug\b": ("UG", "ug"),
|
|
r"(\S+(?:\s+\S+){0,4})\s+e\.?\s*k\.?\b": ("e.K.", "ek"),
|
|
r"(\S+(?:\s+\S+){0,4})\s+gbr\b": ("GbR", "gbr"),
|
|
r"(\S+(?:\s+\S+){0,4})\s+ohg\b": ("OHG", "ohg"),
|
|
r"(\S+(?:\s+\S+){0,4})\s+gmbh\s*&\s*co\.?\s*kg": ("GmbH & Co. KG", "gmbh_co_kg"),
|
|
}
|
|
text_lower = text.lower()
|
|
for pattern, (form_label, form_id) in legal_forms.items():
|
|
m = re.search(pattern, text_lower)
|
|
if m:
|
|
raw_name = m.group(0).strip()
|
|
# Clean up: take from uppercase start
|
|
for i, ch in enumerate(text[m.start():m.end()]):
|
|
if ch.isupper():
|
|
cp["companyName"] = text[m.start() + i:m.end()].strip()
|
|
break
|
|
cp["legalForm"] = form_id
|
|
break
|
|
|
|
# PLZ + Ort
|
|
plz_match = re.search(
|
|
r"[d\-]?\s*(\d{5})\s+([A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+(?:\s+[a-z]+)*)",
|
|
text,
|
|
)
|
|
if plz_match:
|
|
cp["headquartersZip"] = plz_match.group(1)
|
|
cp["headquartersCity"] = plz_match.group(2).strip()
|
|
cp["headquartersCountry"] = "DE"
|
|
|
|
# Strasse
|
|
street_match = re.search(
|
|
r"([A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+(?:str(?:\.|asse)?|weg|allee|platz|ring|gasse)"
|
|
r"\s*\.?\s*\d+[a-z]?)",
|
|
text,
|
|
)
|
|
if street_match:
|
|
cp["headquartersStreet"] = street_match.group(1).strip()
|
|
|
|
# USt-IdNr
|
|
ust_match = re.search(r"DE\s*\d{9}", text)
|
|
if ust_match:
|
|
cp["ustIdNr"] = ust_match.group(0).replace(" ", "")
|
|
|
|
# HRB/HRA
|
|
hrb_match = re.search(r"HRB?\s*\d+", text, re.IGNORECASE)
|
|
if hrb_match:
|
|
cp["registrationNumber"] = hrb_match.group(0)
|
|
|
|
# Registergericht
|
|
reg_match = re.search(
|
|
r"(?:amtsgericht|registergericht|ag)\s+([A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+)",
|
|
text, re.IGNORECASE,
|
|
)
|
|
if reg_match:
|
|
cp["registrationCourt"] = reg_match.group(0)
|
|
|
|
|
|
def _extract_dpo(text: str, result: dict) -> None:
|
|
"""Extract DPO name and email."""
|
|
cp = result["company_profile"]
|
|
|
|
# DPO email
|
|
dpo_section = re.search(
|
|
r"datenschutzbeauftragte[rn]?\s*[\s\S]{0,300}",
|
|
text, re.IGNORECASE,
|
|
)
|
|
if dpo_section:
|
|
section = dpo_section.group(0)
|
|
email_match = re.search(r"[\w.+-]+@[\w-]+\.[\w.-]+", section)
|
|
if email_match:
|
|
cp["dpoEmail"] = email_match.group(0)
|
|
|
|
# DPO name (after "Datenschutzbeauftragter:" or similar)
|
|
name_match = re.search(
|
|
r"(?:datenschutzbeauftragte[rn]?\s*:?\s*)"
|
|
r"([A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+\s+"
|
|
r"[A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+)",
|
|
text,
|
|
)
|
|
if name_match:
|
|
cp["dpoName"] = name_match.group(1)
|
|
|
|
|
|
def _extract_scope_hints(text: str, result: dict) -> None:
|
|
"""Extract scope-relevant signals from document text."""
|
|
hints = result["compliance_scope_hints"]
|
|
|
|
# Sensitive data categories (Art. 9)
|
|
if any(kw in text for kw in [
|
|
"gesundheitsdaten", "biometrisch", "genetisch",
|
|
"religionszugehoerigkeit", "gewerkschaft", "sexualleben",
|
|
"politische meinung", "ethnische herkunft",
|
|
]):
|
|
hints.append({
|
|
"field": "processesSpecialCategories",
|
|
"value": True,
|
|
"source": "Erwaehnung besonderer Datenkategorien (Art. 9 DSGVO) im Text",
|
|
})
|
|
|
|
# Third country transfer
|
|
if any(kw in text for kw in ["usa", "drittland", "drittstaaten", "third country"]):
|
|
hints.append({
|
|
"field": "hasThirdCountryTransfer",
|
|
"value": True,
|
|
"source": "Drittlandtransfer erwaehnt",
|
|
})
|
|
|
|
# Large-scale processing
|
|
if any(kw in text for kw in [
|
|
"umfangreiche verarbeitung", "grosse anzahl",
|
|
"large scale", "massenverarbeitung",
|
|
]):
|
|
hints.append({
|
|
"field": "largeScaleProcessing",
|
|
"value": True,
|
|
"source": "Hinweis auf umfangreiche Verarbeitung",
|
|
})
|
|
|
|
# Automated decision-making
|
|
if any(kw in text for kw in [
|
|
"automatisierte entscheidung", "profiling", "scoring",
|
|
"automated decision", "art. 22",
|
|
]):
|
|
hints.append({
|
|
"field": "automatedDecisionMaking",
|
|
"value": True,
|
|
"source": "Automatisierte Entscheidungsfindung erwaehnt",
|
|
})
|
|
|
|
# Auftragsverarbeitung (processor role)
|
|
if any(kw in text for kw in [
|
|
"auftragsverarbeitung", "auftragsverarbeiter",
|
|
"im auftrag", "weisungsgebunden",
|
|
]):
|
|
hints.append({
|
|
"field": "isDataProcessor",
|
|
"value": True,
|
|
"source": "Auftragsverarbeitung erwaehnt",
|
|
})
|
|
|
|
# Newsletter / Marketing
|
|
if any(kw in text for kw in ["newsletter", "marketing", "werbung"]):
|
|
hints.append({
|
|
"field": "hasNewsletter",
|
|
"value": True,
|
|
"source": "Newsletter/Marketing erwaehnt",
|
|
})
|
|
|
|
# Employee data
|
|
if any(kw in text for kw in [
|
|
"mitarbeiterdaten", "beschaeftigtendaten", "personalakte",
|
|
"bewerberdaten", "arbeitnehmer",
|
|
]):
|
|
hints.append({
|
|
"field": "processesEmployeeData",
|
|
"value": True,
|
|
"source": "Beschaeftigtendaten-Verarbeitung erwaehnt",
|
|
})
|