From 7be34552bb921bcc7016d72a002a864decf2e3ff Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Tue, 12 May 2026 17:34:33 +0200 Subject: [PATCH] feat(compliance-check): profile extraction + scenario classification - New profile_extractor.py: extracts Company Profile fields (name, legal form, address, DPO, USt-IdNr) and Compliance Scope hints (Art. 9 data, third country, profiling) from document texts - Scenario per document: regenerate (<30%), fix (30-95%), import (>95%) - Widerruf for B2B: no longer skipped, instead all checks flagged as INFO with "not needed for B2B" hint - Move _build_profile_html to report builder module - DocCheckResult gets scenario field Co-Authored-By: Claude Opus 4.6 (1M context) --- .../api/agent_compliance_check_routes.py | 84 +++--- .../compliance/api/agent_doc_check_report.py | 34 +++ .../compliance/api/agent_doc_check_routes.py | 1 + .../compliance/services/profile_extractor.py | 248 ++++++++++++++++++ 4 files changed, 318 insertions(+), 49 deletions(-) create mode 100644 backend-compliance/compliance/services/profile_extractor.py diff --git a/backend-compliance/compliance/api/agent_compliance_check_routes.py b/backend-compliance/compliance/api/agent_compliance_check_routes.py index c113437..98e4e8a 100644 --- a/backend-compliance/compliance/api/agent_compliance_check_routes.py +++ b/backend-compliance/compliance/api/agent_compliance_check_routes.py @@ -268,15 +268,29 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): l2p = sum(1 for c in l2 if c.passed) r.correctness_pct = round(l2p / len(l2) * 100) if l2 else 0 - # Step 4: Build report + # Step 4: Extract profile hints from documents + _update(check_id, "Profil wird aus Dokumenten extrahiert...") + from compliance.services.profile_extractor import extract_profile_from_documents + extracted_profile = extract_profile_from_documents(doc_texts, profile_dict) + + # Step 4b: Determine scenario per document + for r in results: + if r.error: + r.scenario = "skip" + elif r.completeness_pct < 30: + r.scenario = "regenerate" + elif r.completeness_pct < 95: + r.scenario = "fix" + else: + r.scenario = "import" + + # Step 5: Build report _update(check_id, "Report wird erstellt...") report_html = build_html_report(results, None) - - # Prepend profile summary to report profile_html = _build_profile_html(profile) full_html = profile_html + report_html - # Step 5: Send email + # Step 6: Send email doc_count = len([r for r in results if not r.error]) email_result = send_email( recipient=req.recipient, @@ -284,10 +298,11 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): body_html=full_html, ) - # Step 6: Store result + # Step 7: Store result response = { "results": [_result_to_dict(r) for r in results], "business_profile": profile_dict, + "extracted_profile": extracted_profile, "banner_result": { "detected": banner_result.get("banner_detected", False) if banner_result else False, "provider": banner_result.get("banner_provider", "") if banner_result else "", @@ -406,16 +421,9 @@ async def _check_single( def _get_skip_types(profile) -> dict[str, str]: - """Return doc_types to skip entirely based on business profile. - - Returns dict mapping doc_type -> skip reason. - """ - skip: dict[str, str] = {} - if profile.business_type in ("b2b", "b2g"): - skip["widerruf"] = "Uebersprungen: Widerrufsbelehrung nur fuer B2C relevant" - if profile.business_type in ("b2b", "b2g") and not profile.has_online_shop: - skip["nutzungsbedingungen"] = "Uebersprungen: Nutzungsbedingungen bei B2B ohne Shop selten relevant" - return skip + """Doc_types to skip entirely. Currently empty — we check everything + and flag irrelevant items as INFO instead of skipping.""" + return {} def _apply_profile_filter(result, profile, doc_type: str): @@ -434,10 +442,16 @@ def _apply_profile_filter(result, profile, doc_type: str): check.skipped = True check.hint = "Nicht relevant (kein B2C Online-Shop)" - # Widerruf only relevant for B2C + # Widerruf: Flag entire document as unnecessary for B2B if doc_type == "widerruf" and profile.business_type not in ("b2c", "unknown"): - if check.severity == "INFO": - check.skipped = True + check.severity = "INFO" + if not check.passed: + check.hint = ( + "Als B2B-Unternehmen benoetigen Sie keine Widerrufsbelehrung " + "(§355 BGB gilt nur fuer Verbrauchervertraege). " + "Empfehlung: Entfernen Sie die Widerrufsbelehrung von " + "Ihrer Website, da sie Verwirrung stiften kann." + ) # Regulated profession: check for Kammer info if "kammer" in cid or "berufsordnung" in check.label.lower(): @@ -479,41 +493,13 @@ def _result_to_dict(r) -> dict: "correctness_pct": r.correctness_pct, "checks": [{f: getattr(c, f) for f in fields} for c in r.checks], "findings_count": r.findings_count, "error": r.error, + "scenario": getattr(r, "scenario", ""), } def _build_profile_html(profile) -> str: - """Build a small HTML block summarizing the detected business profile.""" - service_tags = ", ".join(profile.detected_services[:10]) or "keine erkannt" - flags = [] - if profile.has_online_shop: - flags.append("Online-Shop") - if profile.has_editorial_content: - flags.append("Redaktionelle Inhalte") - if profile.is_regulated_profession: - flags.append(f"Regulierter Beruf ({profile.regulated_profession_type})") - if profile.needs_odr: - flags.append("ODR-pflichtig") - flags_str = ", ".join(flags) or "keine" - - return ( - '
' - '

' - 'Erkanntes Geschaeftsmodell

' - '' - f'' - f'' - f'' - f'' - f'' - f'' - f'' - f'' - '
Typ:{profile.business_type.upper()}' - f' ({profile.industry})
Merkmale:{flags_str}
Dienste:{service_tags}
Konfidenz:{int(profile.confidence * 100)}%
' - ) + from .agent_doc_check_report import build_profile_html + return build_profile_html(profile) # Cross-check extracted to compliance.services.banner_cookie_cross_check diff --git a/backend-compliance/compliance/api/agent_doc_check_report.py b/backend-compliance/compliance/api/agent_doc_check_report.py index 47e9aff..6f4186d 100644 --- a/backend-compliance/compliance/api/agent_doc_check_report.py +++ b/backend-compliance/compliance/api/agent_doc_check_report.py @@ -173,3 +173,37 @@ def _render_cookie_banner(html: list[str], cookie_result: dict) -> None: else: html.append('
Keine Verstoesse erkannt.') html.append('') + + +def build_profile_html(profile) -> str: + """Build a small HTML block summarizing the detected business profile.""" + service_tags = ", ".join(profile.detected_services[:10]) or "keine erkannt" + flags = [] + if profile.has_online_shop: + flags.append("Online-Shop") + if profile.has_editorial_content: + flags.append("Redaktionelle Inhalte") + if profile.is_regulated_profession: + flags.append(f"Regulierter Beruf ({profile.regulated_profession_type})") + if profile.needs_odr: + flags.append("ODR-pflichtig") + flags_str = ", ".join(flags) or "keine" + + return ( + '
' + '

' + 'Erkanntes Geschaeftsmodell

' + '' + f'' + f'' + f'' + f'' + f'' + f'' + f'' + f'' + '
Typ:{profile.business_type.upper()}' + f' ({profile.industry})
Merkmale:{flags_str}
Dienste:{service_tags}
Konfidenz:{int(profile.confidence * 100)}%
' + ) diff --git a/backend-compliance/compliance/api/agent_doc_check_routes.py b/backend-compliance/compliance/api/agent_doc_check_routes.py index e13886e..def5e2e 100644 --- a/backend-compliance/compliance/api/agent_doc_check_routes.py +++ b/backend-compliance/compliance/api/agent_doc_check_routes.py @@ -65,6 +65,7 @@ class DocCheckResult(BaseModel): checks: list[CheckItem] = [] findings_count: int = 0 error: str = "" + scenario: str = "" # regenerate | fix | import | skip class DocCheckResponse(BaseModel): diff --git a/backend-compliance/compliance/services/profile_extractor.py b/backend-compliance/compliance/services/profile_extractor.py new file mode 100644 index 0000000..4b5d987 --- /dev/null +++ b/backend-compliance/compliance/services/profile_extractor.py @@ -0,0 +1,248 @@ +""" +Profile Extractor — pre-fill Company Profile + Compliance Scope from documents. + +When a customer uploads their existing legal documents, we extract +what we can and pre-fill the profile/scope wizard so they only need +to confirm and fill gaps. + +Returns a dict that maps to CompanyProfile and ScopeProfilingAnswer fields. +""" + +import logging +import re + +logger = logging.getLogger(__name__) + + +def extract_profile_from_documents( + doc_texts: dict[str, str], + business_profile: dict | None = None, +) -> dict: + """Extract Company Profile fields from document texts. + + Args: + doc_texts: dict mapping doc_type -> text + business_profile: optional detected business profile from profiler + + Returns dict with pre-filled fields for Company Profile and Scope. + """ + result: dict = { + "company_profile": {}, + "compliance_scope_hints": [], + "extracted_from": [], + } + + all_text = "\n".join(doc_texts.values()).lower() + all_text_original = "\n".join(doc_texts.values()) + + # ── Company name + legal form ──────────────────────────────── + impressum = doc_texts.get("impressum", "") + if impressum: + _extract_company_info(impressum, result) + result["extracted_from"].append("impressum") + + # Fallback: try DSI + if not result["company_profile"].get("companyName") and "dse" in doc_texts: + _extract_company_info(doc_texts["dse"], result) + result["extracted_from"].append("dse") + + # ── DPO contact ────────────────────────────────────────────── + _extract_dpo(all_text_original, result) + + # ── Business model from profiler ───────────────────────────── + if business_profile: + bp = business_profile + if bp.get("business_type") and bp["business_type"] != "unknown": + result["company_profile"]["businessModel"] = bp["business_type"] + if bp.get("industry") and bp["industry"] != "unknown": + result["company_profile"]["industry"] = [bp["industry"]] + if bp.get("has_online_shop"): + result["company_profile"]["offerings"] = ["online_shop"] + if bp.get("is_regulated_profession"): + result["company_profile"]["regulatedProfession"] = True + result["company_profile"]["regulatedProfessionType"] = bp.get( + "regulated_profession_type", "" + ) + + # ── Scope hints from document content ──────────────────────── + _extract_scope_hints(all_text, result) + + # ── Tracking services → data processing activities ─────────── + if business_profile and business_profile.get("detected_services"): + result["detected_services"] = business_profile["detected_services"] + + logger.info( + "Extracted %d profile fields, %d scope hints from %d documents", + len(result["company_profile"]), + len(result["compliance_scope_hints"]), + len(doc_texts), + ) + return result + + +def _extract_company_info(text: str, result: dict) -> None: + """Extract company name, legal form, address from text.""" + cp = result["company_profile"] + + # GmbH / AG / UG / e.K. etc. + legal_forms = { + r"(\S+(?:\s+\S+){0,4})\s+gmbh\b": ("GmbH", "gmbh"), + r"(\S+(?:\s+\S+){0,4})\s+ag\b": ("AG", "ag"), + r"(\S+(?:\s+\S+){0,4})\s+ug\b": ("UG", "ug"), + r"(\S+(?:\s+\S+){0,4})\s+e\.?\s*k\.?\b": ("e.K.", "ek"), + r"(\S+(?:\s+\S+){0,4})\s+gbr\b": ("GbR", "gbr"), + r"(\S+(?:\s+\S+){0,4})\s+ohg\b": ("OHG", "ohg"), + r"(\S+(?:\s+\S+){0,4})\s+gmbh\s*&\s*co\.?\s*kg": ("GmbH & Co. KG", "gmbh_co_kg"), + } + text_lower = text.lower() + for pattern, (form_label, form_id) in legal_forms.items(): + m = re.search(pattern, text_lower) + if m: + raw_name = m.group(0).strip() + # Clean up: take from uppercase start + for i, ch in enumerate(text[m.start():m.end()]): + if ch.isupper(): + cp["companyName"] = text[m.start() + i:m.end()].strip() + break + cp["legalForm"] = form_id + break + + # PLZ + Ort + plz_match = re.search( + r"[d\-]?\s*(\d{5})\s+([A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+(?:\s+[a-z]+)*)", + text, + ) + if plz_match: + cp["headquartersZip"] = plz_match.group(1) + cp["headquartersCity"] = plz_match.group(2).strip() + cp["headquartersCountry"] = "DE" + + # Strasse + street_match = re.search( + r"([A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+(?:str(?:\.|asse)?|weg|allee|platz|ring|gasse)" + r"\s*\.?\s*\d+[a-z]?)", + text, + ) + if street_match: + cp["headquartersStreet"] = street_match.group(1).strip() + + # USt-IdNr + ust_match = re.search(r"DE\s*\d{9}", text) + if ust_match: + cp["ustIdNr"] = ust_match.group(0).replace(" ", "") + + # HRB/HRA + hrb_match = re.search(r"HRB?\s*\d+", text, re.IGNORECASE) + if hrb_match: + cp["registrationNumber"] = hrb_match.group(0) + + # Registergericht + reg_match = re.search( + r"(?:amtsgericht|registergericht|ag)\s+([A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+)", + text, re.IGNORECASE, + ) + if reg_match: + cp["registrationCourt"] = reg_match.group(0) + + +def _extract_dpo(text: str, result: dict) -> None: + """Extract DPO name and email.""" + cp = result["company_profile"] + + # DPO email + dpo_section = re.search( + r"datenschutzbeauftragte[rn]?\s*[\s\S]{0,300}", + text, re.IGNORECASE, + ) + if dpo_section: + section = dpo_section.group(0) + email_match = re.search(r"[\w.+-]+@[\w-]+\.[\w.-]+", section) + if email_match: + cp["dpoEmail"] = email_match.group(0) + + # DPO name (after "Datenschutzbeauftragter:" or similar) + name_match = re.search( + r"(?:datenschutzbeauftragte[rn]?\s*:?\s*)" + r"([A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+\s+" + r"[A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+)", + text, + ) + if name_match: + cp["dpoName"] = name_match.group(1) + + +def _extract_scope_hints(text: str, result: dict) -> None: + """Extract scope-relevant signals from document text.""" + hints = result["compliance_scope_hints"] + + # Sensitive data categories (Art. 9) + if any(kw in text for kw in [ + "gesundheitsdaten", "biometrisch", "genetisch", + "religionszugehoerigkeit", "gewerkschaft", "sexualleben", + "politische meinung", "ethnische herkunft", + ]): + hints.append({ + "field": "processesSpecialCategories", + "value": True, + "source": "Erwaehnung besonderer Datenkategorien (Art. 9 DSGVO) im Text", + }) + + # Third country transfer + if any(kw in text for kw in ["usa", "drittland", "drittstaaten", "third country"]): + hints.append({ + "field": "hasThirdCountryTransfer", + "value": True, + "source": "Drittlandtransfer erwaehnt", + }) + + # Large-scale processing + if any(kw in text for kw in [ + "umfangreiche verarbeitung", "grosse anzahl", + "large scale", "massenverarbeitung", + ]): + hints.append({ + "field": "largeScaleProcessing", + "value": True, + "source": "Hinweis auf umfangreiche Verarbeitung", + }) + + # Automated decision-making + if any(kw in text for kw in [ + "automatisierte entscheidung", "profiling", "scoring", + "automated decision", "art. 22", + ]): + hints.append({ + "field": "automatedDecisionMaking", + "value": True, + "source": "Automatisierte Entscheidungsfindung erwaehnt", + }) + + # Auftragsverarbeitung (processor role) + if any(kw in text for kw in [ + "auftragsverarbeitung", "auftragsverarbeiter", + "im auftrag", "weisungsgebunden", + ]): + hints.append({ + "field": "isDataProcessor", + "value": True, + "source": "Auftragsverarbeitung erwaehnt", + }) + + # Newsletter / Marketing + if any(kw in text for kw in ["newsletter", "marketing", "werbung"]): + hints.append({ + "field": "hasNewsletter", + "value": True, + "source": "Newsletter/Marketing erwaehnt", + }) + + # Employee data + if any(kw in text for kw in [ + "mitarbeiterdaten", "beschaeftigtendaten", "personalakte", + "bewerberdaten", "arbeitnehmer", + ]): + hints.append({ + "field": "processesEmployeeData", + "value": True, + "source": "Beschaeftigtendaten-Verarbeitung erwaehnt", + })