Files
breakpilot-compliance/backend-compliance/compliance/services/profile_extractor.py
T
Benjamin Admin 3f23a64d5f feat(agent): Impressum-Tab auf Haupt-Engine + Profil/§36-Fixes
Ergebnis-Tab rendert jetzt result.results (Haupt-Doc-Check) statt des
abweichenden v3-Agenten — BMW korrekt statt False Positives:
- DocResultView: ein Dokument als Pflichtangaben-Tabelle (Label + gefundener
  Text + 3-Tier-Status), KEINE MC-IDs. ComplianceResultTabs speist Tabs aus
  result.results; ChecklistView-Bausteine exportiert + wiederverwendet.
- profile_extractor: Firmenname/Rechtsform = fruehester Treffer + ausge-
  schriebene Formen (Aktiengesellschaft) -> BMW AG statt "juris GmbH".
- 36 VSBG (MC-010): reines b2c -> POSSIBLY_APPLICABLE (Pruef-Hinweis) statt
  MEDIUM-FAIL; hart nur bei ecommerce. possibly_hint pro MC.
- McCoverage traegt label + found (Snippet); mc_possibly-Aggregat.
- AgentFindingCard/Methodik: interne check_id/mc_id nicht mehr angezeigt.

Tests: test_four_status (16) + Frontend-Vitest gruen; CI-Suite 206, v3/GT
unveraendert. Nur eigene Dateien (geteilter Tree).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-10 23:44:01 +02:00

276 lines
10 KiB
Python

"""
Profile Extractor — pre-fill Company Profile + Compliance Scope from documents.
When a customer uploads their existing legal documents, we extract
what we can and pre-fill the profile/scope wizard so they only need
to confirm and fill gaps.
Returns a dict that maps to CompanyProfile and ScopeProfilingAnswer fields.
"""
import logging
import re
from typing import Optional
logger = logging.getLogger(__name__)
def extract_profile_from_documents(
doc_texts: dict[str, str],
business_profile: Optional[dict] = None,
) -> dict:
"""Extract Company Profile fields from document texts.
Args:
doc_texts: dict mapping doc_type -> text
business_profile: optional detected business profile from profiler
Returns dict with pre-filled fields for Company Profile and Scope.
"""
result: dict = {
"company_profile": {},
"compliance_scope_hints": [],
"extracted_from": [],
}
all_text = "\n".join(doc_texts.values()).lower()
all_text_original = "\n".join(doc_texts.values())
# ── Company name + legal form ────────────────────────────────
impressum = doc_texts.get("impressum", "")
if impressum:
_extract_company_info(impressum, result)
result["extracted_from"].append("impressum")
# Fallback: try DSI
if not result["company_profile"].get("companyName") and "dse" in doc_texts:
_extract_company_info(doc_texts["dse"], result)
result["extracted_from"].append("dse")
# ── DPO contact ──────────────────────────────────────────────
_extract_dpo(all_text_original, result)
# ── Business model from profiler ─────────────────────────────
if business_profile:
bp = business_profile
if bp.get("business_type") and bp["business_type"] != "unknown":
result["company_profile"]["businessModel"] = bp["business_type"]
if bp.get("industry") and bp["industry"] != "unknown":
result["company_profile"]["industry"] = [bp["industry"]]
if bp.get("has_online_shop"):
result["company_profile"]["offerings"] = ["online_shop"]
if bp.get("is_regulated_profession"):
result["company_profile"]["regulatedProfession"] = True
result["company_profile"]["regulatedProfessionType"] = bp.get(
"regulated_profession_type", ""
)
# ── Detected services (full list with metadata) ────────────
try:
from compliance.services.service_detector import detect_services_in_text
detected = detect_services_in_text(all_text)
result["detected_services"] = detected
# Add non-EU services as scope hint
non_eu = [s for s in detected if not s.get("eu_adequate")]
if non_eu:
result["compliance_scope_hints"].append({
"field": "hasThirdCountryTransfer",
"value": True,
"source": f"{len(non_eu)} Dienste ausserhalb EWR erkannt ({', '.join(s['name'] for s in non_eu[:5])}...)",
})
except Exception as e:
logger.warning("Service detection failed: %s", e)
# ── Scope hints from document content ────────────────────────
_extract_scope_hints(all_text, result)
# ── Tracking services → data processing activities ───────────
if business_profile and business_profile.get("detected_services"):
result["detected_services"] = business_profile["detected_services"]
logger.info(
"Extracted %d profile fields, %d scope hints from %d documents",
len(result["company_profile"]),
len(result["compliance_scope_hints"]),
len(doc_texts),
)
return result
def _extract_company_info(text: str, result: dict) -> None:
"""Extract company name, legal form, address from text."""
cp = result["company_profile"]
# Rechtsform + Firmenname. Die Reihenfolge der Muster ist NICHT die
# Priorität — wir nehmen den FRUEHESTEN Treffer im Text: ein Impressum
# nennt den Betreiber zuerst; spätere Erwähnungen (z.B. "juris GmbH" im
# Hinweis auf gesetze-im-internet.de) sind nicht der Anbieter. Ausge-
# schriebene Formen ("Aktiengesellschaft") zählen mit (sonst wird BMW AG
# nicht erkannt und faelschlich die naechste GmbH gegriffen).
legal_forms = [
(r"(\S+(?:\s+\S+){0,4})\s+gmbh\s*&\s*co\.?\s*kg\b", "gmbh_co_kg"),
(r"(\S+(?:\s+\S+){0,4})\s+(?:aktiengesellschaft|ag)\b", "ag"),
(r"(\S+(?:\s+\S+){0,4})\s+(?:unternehmergesellschaft|ug)\b", "ug"),
(r"(\S+(?:\s+\S+){0,4})\s+gmbh\b", "gmbh"),
(r"(\S+(?:\s+\S+){0,4})\s+e\.?\s*k\.?\b", "ek"),
(r"(\S+(?:\s+\S+){0,4})\s+gbr\b", "gbr"),
(r"(\S+(?:\s+\S+){0,4})\s+ohg\b", "ohg"),
]
text_lower = text.lower()
best = None # (start, end, form_id) — frühester Treffer
for pattern, form_id in legal_forms:
m = re.search(pattern, text_lower)
# frühester Treffer gewinnt; bei Gleichstand die Listen-Reihenfolge
# (GmbH & Co. KG vor GmbH).
if m and (best is None or m.start() < best[0]):
best = (m.start(), m.end(), form_id)
if best:
start, end, form_id = best
# Firmenname ab dem ersten Grossbuchstaben im Treffer (schneidet
# führende Kleinwörter wie "von der" ab).
for i, ch in enumerate(text[start:end]):
if ch.isupper():
cp["companyName"] = text[start + i:end].strip()
break
cp["legalForm"] = form_id
# PLZ + Ort
plz_match = re.search(
r"[d\-]?\s*(\d{5})\s+([A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+(?:\s+[a-z]+)*)",
text,
)
if plz_match:
cp["headquartersZip"] = plz_match.group(1)
cp["headquartersCity"] = plz_match.group(2).strip()
cp["headquartersCountry"] = "DE"
# Strasse
street_match = re.search(
r"([A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+(?:str(?:\.|asse)?|weg|allee|platz|ring|gasse)"
r"\s*\.?\s*\d+[a-z]?)",
text,
)
if street_match:
cp["headquartersStreet"] = street_match.group(1).strip()
# USt-IdNr
ust_match = re.search(r"DE\s*\d{9}", text)
if ust_match:
cp["ustIdNr"] = ust_match.group(0).replace(" ", "")
# HRB/HRA
hrb_match = re.search(r"HRB?\s*\d+", text, re.IGNORECASE)
if hrb_match:
cp["registrationNumber"] = hrb_match.group(0)
# Registergericht
reg_match = re.search(
r"(?:amtsgericht|registergericht|ag)\s+([A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+)",
text, re.IGNORECASE,
)
if reg_match:
cp["registrationCourt"] = reg_match.group(0)
def _extract_dpo(text: str, result: dict) -> None:
"""Extract DPO name and email."""
cp = result["company_profile"]
# DPO email
dpo_section = re.search(
r"datenschutzbeauftragte[rn]?\s*[\s\S]{0,300}",
text, re.IGNORECASE,
)
if dpo_section:
section = dpo_section.group(0)
email_match = re.search(r"[\w.+-]+@[\w-]+\.[\w.-]+", section)
if email_match:
cp["dpoEmail"] = email_match.group(0)
# DPO name (after "Datenschutzbeauftragter:" or similar)
name_match = re.search(
r"(?:datenschutzbeauftragte[rn]?\s*:?\s*)"
r"([A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+\s+"
r"[A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+)",
text,
)
if name_match:
cp["dpoName"] = name_match.group(1)
def _extract_scope_hints(text: str, result: dict) -> None:
"""Extract scope-relevant signals from document text."""
hints = result["compliance_scope_hints"]
# Sensitive data categories (Art. 9)
if any(kw in text for kw in [
"gesundheitsdaten", "biometrisch", "genetisch",
"religionszugehoerigkeit", "gewerkschaft", "sexualleben",
"politische meinung", "ethnische herkunft",
]):
hints.append({
"field": "processesSpecialCategories",
"value": True,
"source": "Erwaehnung besonderer Datenkategorien (Art. 9 DSGVO) im Text",
})
# Third country transfer
if any(kw in text for kw in ["usa", "drittland", "drittstaaten", "third country"]):
hints.append({
"field": "hasThirdCountryTransfer",
"value": True,
"source": "Drittlandtransfer erwaehnt",
})
# Large-scale processing
if any(kw in text for kw in [
"umfangreiche verarbeitung", "grosse anzahl",
"large scale", "massenverarbeitung",
]):
hints.append({
"field": "largeScaleProcessing",
"value": True,
"source": "Hinweis auf umfangreiche Verarbeitung",
})
# Automated decision-making
if any(kw in text for kw in [
"automatisierte entscheidung", "profiling", "scoring",
"automated decision", "art. 22",
]):
hints.append({
"field": "automatedDecisionMaking",
"value": True,
"source": "Automatisierte Entscheidungsfindung erwaehnt",
})
# Auftragsverarbeitung (processor role)
if any(kw in text for kw in [
"auftragsverarbeitung", "auftragsverarbeiter",
"im auftrag", "weisungsgebunden",
]):
hints.append({
"field": "isDataProcessor",
"value": True,
"source": "Auftragsverarbeitung erwaehnt",
})
# Newsletter / Marketing
if any(kw in text for kw in ["newsletter", "marketing", "werbung"]):
hints.append({
"field": "hasNewsletter",
"value": True,
"source": "Newsletter/Marketing erwaehnt",
})
# Employee data
if any(kw in text for kw in [
"mitarbeiterdaten", "beschaeftigtendaten", "personalakte",
"bewerberdaten", "arbeitnehmer",
]):
hints.append({
"field": "processesEmployeeData",
"value": True,
"source": "Beschaeftigtendaten-Verarbeitung erwaehnt",
})