diff --git a/backend-compliance/compliance/api/agent_compliance_check_routes.py b/backend-compliance/compliance/api/agent_compliance_check_routes.py
index c113437..98e4e8a 100644
--- a/backend-compliance/compliance/api/agent_compliance_check_routes.py
+++ b/backend-compliance/compliance/api/agent_compliance_check_routes.py
@@ -268,15 +268,29 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
l2p = sum(1 for c in l2 if c.passed)
r.correctness_pct = round(l2p / len(l2) * 100) if l2 else 0
- # Step 4: Build report
+ # Step 4: Extract profile hints from documents
+ _update(check_id, "Profil wird aus Dokumenten extrahiert...")
+ from compliance.services.profile_extractor import extract_profile_from_documents
+ extracted_profile = extract_profile_from_documents(doc_texts, profile_dict)
+
+ # Step 4b: Determine scenario per document
+ for r in results:
+ if r.error:
+ r.scenario = "skip"
+ elif r.completeness_pct < 30:
+ r.scenario = "regenerate"
+ elif r.completeness_pct < 95:
+ r.scenario = "fix"
+ else:
+ r.scenario = "import"
+
+ # Step 5: Build report
_update(check_id, "Report wird erstellt...")
report_html = build_html_report(results, None)
-
- # Prepend profile summary to report
profile_html = _build_profile_html(profile)
full_html = profile_html + report_html
- # Step 5: Send email
+ # Step 6: Send email
doc_count = len([r for r in results if not r.error])
email_result = send_email(
recipient=req.recipient,
@@ -284,10 +298,11 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
body_html=full_html,
)
- # Step 6: Store result
+ # Step 7: Store result
response = {
"results": [_result_to_dict(r) for r in results],
"business_profile": profile_dict,
+ "extracted_profile": extracted_profile,
"banner_result": {
"detected": banner_result.get("banner_detected", False) if banner_result else False,
"provider": banner_result.get("banner_provider", "") if banner_result else "",
@@ -406,16 +421,9 @@ async def _check_single(
def _get_skip_types(profile) -> dict[str, str]:
- """Return doc_types to skip entirely based on business profile.
-
- Returns dict mapping doc_type -> skip reason.
- """
- skip: dict[str, str] = {}
- if profile.business_type in ("b2b", "b2g"):
- skip["widerruf"] = "Uebersprungen: Widerrufsbelehrung nur fuer B2C relevant"
- if profile.business_type in ("b2b", "b2g") and not profile.has_online_shop:
- skip["nutzungsbedingungen"] = "Uebersprungen: Nutzungsbedingungen bei B2B ohne Shop selten relevant"
- return skip
+ """Doc_types to skip entirely. Currently empty — we check everything
+ and flag irrelevant items as INFO instead of skipping."""
+ return {}
def _apply_profile_filter(result, profile, doc_type: str):
@@ -434,10 +442,16 @@ def _apply_profile_filter(result, profile, doc_type: str):
check.skipped = True
check.hint = "Nicht relevant (kein B2C Online-Shop)"
- # Widerruf only relevant for B2C
+ # Widerruf: Flag entire document as unnecessary for B2B
if doc_type == "widerruf" and profile.business_type not in ("b2c", "unknown"):
- if check.severity == "INFO":
- check.skipped = True
+ check.severity = "INFO"
+ if not check.passed:
+ check.hint = (
+ "Als B2B-Unternehmen benoetigen Sie keine Widerrufsbelehrung "
+ "(§355 BGB gilt nur fuer Verbrauchervertraege). "
+ "Empfehlung: Entfernen Sie die Widerrufsbelehrung von "
+ "Ihrer Website, da sie Verwirrung stiften kann."
+ )
# Regulated profession: check for Kammer info
if "kammer" in cid or "berufsordnung" in check.label.lower():
@@ -479,41 +493,13 @@ def _result_to_dict(r) -> dict:
"correctness_pct": r.correctness_pct,
"checks": [{f: getattr(c, f) for f in fields} for c in r.checks],
"findings_count": r.findings_count, "error": r.error,
+ "scenario": getattr(r, "scenario", ""),
}
def _build_profile_html(profile) -> str:
- """Build a small HTML block summarizing the detected business profile."""
- service_tags = ", ".join(profile.detected_services[:10]) or "keine erkannt"
- flags = []
- if profile.has_online_shop:
- flags.append("Online-Shop")
- if profile.has_editorial_content:
- flags.append("Redaktionelle Inhalte")
- if profile.is_regulated_profession:
- flags.append(f"Regulierter Beruf ({profile.regulated_profession_type})")
- if profile.needs_odr:
- flags.append("ODR-pflichtig")
- flags_str = ", ".join(flags) or "keine"
-
- return (
- '
'
- '
'
- 'Erkanntes Geschaeftsmodell
'
- '
'
- f'| Typ: | '
- f'{profile.business_type.upper()}'
- f' ({profile.industry}) |
'
- f'| Merkmale: | '
- f'{flags_str} |
'
- f'| Dienste: | '
- f'{service_tags} |
'
- f'| Konfidenz: | '
- f'{int(profile.confidence * 100)}% |
'
- '
'
- )
+ from .agent_doc_check_report import build_profile_html
+ return build_profile_html(profile)
# Cross-check extracted to compliance.services.banner_cookie_cross_check
diff --git a/backend-compliance/compliance/api/agent_doc_check_report.py b/backend-compliance/compliance/api/agent_doc_check_report.py
index 47e9aff..6f4186d 100644
--- a/backend-compliance/compliance/api/agent_doc_check_report.py
+++ b/backend-compliance/compliance/api/agent_doc_check_report.py
@@ -173,3 +173,37 @@ def _render_cookie_banner(html: list[str], cookie_result: dict) -> None:
else:
html.append('
Keine Verstoesse erkannt.')
html.append('')
+
+
+def build_profile_html(profile) -> str:
+ """Build a small HTML block summarizing the detected business profile."""
+ service_tags = ", ".join(profile.detected_services[:10]) or "keine erkannt"
+ flags = []
+ if profile.has_online_shop:
+ flags.append("Online-Shop")
+ if profile.has_editorial_content:
+ flags.append("Redaktionelle Inhalte")
+ if profile.is_regulated_profession:
+ flags.append(f"Regulierter Beruf ({profile.regulated_profession_type})")
+ if profile.needs_odr:
+ flags.append("ODR-pflichtig")
+ flags_str = ", ".join(flags) or "keine"
+
+ return (
+ ''
+ '
'
+ 'Erkanntes Geschaeftsmodell
'
+ '
'
+ f'| Typ: | '
+ f'{profile.business_type.upper()}'
+ f' ({profile.industry}) |
'
+ f'| Merkmale: | '
+ f'{flags_str} |
'
+ f'| Dienste: | '
+ f'{service_tags} |
'
+ f'| Konfidenz: | '
+ f'{int(profile.confidence * 100)}% |
'
+ '
'
+ )
diff --git a/backend-compliance/compliance/api/agent_doc_check_routes.py b/backend-compliance/compliance/api/agent_doc_check_routes.py
index e13886e..def5e2e 100644
--- a/backend-compliance/compliance/api/agent_doc_check_routes.py
+++ b/backend-compliance/compliance/api/agent_doc_check_routes.py
@@ -65,6 +65,7 @@ class DocCheckResult(BaseModel):
checks: list[CheckItem] = []
findings_count: int = 0
error: str = ""
+ scenario: str = "" # regenerate | fix | import | skip
class DocCheckResponse(BaseModel):
diff --git a/backend-compliance/compliance/services/profile_extractor.py b/backend-compliance/compliance/services/profile_extractor.py
new file mode 100644
index 0000000..4b5d987
--- /dev/null
+++ b/backend-compliance/compliance/services/profile_extractor.py
@@ -0,0 +1,248 @@
+"""
+Profile Extractor — pre-fill Company Profile + Compliance Scope from documents.
+
+When a customer uploads their existing legal documents, we extract
+what we can and pre-fill the profile/scope wizard so they only need
+to confirm and fill gaps.
+
+Returns a dict that maps to CompanyProfile and ScopeProfilingAnswer fields.
+"""
+
+import logging
+import re
+
+logger = logging.getLogger(__name__)
+
+
+def extract_profile_from_documents(
+ doc_texts: dict[str, str],
+ business_profile: dict | None = None,
+) -> dict:
+ """Extract Company Profile fields from document texts.
+
+ Args:
+ doc_texts: dict mapping doc_type -> text
+ business_profile: optional detected business profile from profiler
+
+ Returns dict with pre-filled fields for Company Profile and Scope.
+ """
+ result: dict = {
+ "company_profile": {},
+ "compliance_scope_hints": [],
+ "extracted_from": [],
+ }
+
+ all_text = "\n".join(doc_texts.values()).lower()
+ all_text_original = "\n".join(doc_texts.values())
+
+ # ── Company name + legal form ────────────────────────────────
+ impressum = doc_texts.get("impressum", "")
+ if impressum:
+ _extract_company_info(impressum, result)
+ result["extracted_from"].append("impressum")
+
+ # Fallback: try DSI
+ if not result["company_profile"].get("companyName") and "dse" in doc_texts:
+ _extract_company_info(doc_texts["dse"], result)
+ result["extracted_from"].append("dse")
+
+ # ── DPO contact ──────────────────────────────────────────────
+ _extract_dpo(all_text_original, result)
+
+ # ── Business model from profiler ─────────────────────────────
+ if business_profile:
+ bp = business_profile
+ if bp.get("business_type") and bp["business_type"] != "unknown":
+ result["company_profile"]["businessModel"] = bp["business_type"]
+ if bp.get("industry") and bp["industry"] != "unknown":
+ result["company_profile"]["industry"] = [bp["industry"]]
+ if bp.get("has_online_shop"):
+ result["company_profile"]["offerings"] = ["online_shop"]
+ if bp.get("is_regulated_profession"):
+ result["company_profile"]["regulatedProfession"] = True
+ result["company_profile"]["regulatedProfessionType"] = bp.get(
+ "regulated_profession_type", ""
+ )
+
+ # ── Scope hints from document content ────────────────────────
+ _extract_scope_hints(all_text, result)
+
+ # ── Tracking services → data processing activities ───────────
+ if business_profile and business_profile.get("detected_services"):
+ result["detected_services"] = business_profile["detected_services"]
+
+ logger.info(
+ "Extracted %d profile fields, %d scope hints from %d documents",
+ len(result["company_profile"]),
+ len(result["compliance_scope_hints"]),
+ len(doc_texts),
+ )
+ return result
+
+
+def _extract_company_info(text: str, result: dict) -> None:
+ """Extract company name, legal form, address from text."""
+ cp = result["company_profile"]
+
+ # GmbH / AG / UG / e.K. etc.
+ legal_forms = {
+ r"(\S+(?:\s+\S+){0,4})\s+gmbh\b": ("GmbH", "gmbh"),
+ r"(\S+(?:\s+\S+){0,4})\s+ag\b": ("AG", "ag"),
+ r"(\S+(?:\s+\S+){0,4})\s+ug\b": ("UG", "ug"),
+ r"(\S+(?:\s+\S+){0,4})\s+e\.?\s*k\.?\b": ("e.K.", "ek"),
+ r"(\S+(?:\s+\S+){0,4})\s+gbr\b": ("GbR", "gbr"),
+ r"(\S+(?:\s+\S+){0,4})\s+ohg\b": ("OHG", "ohg"),
+ r"(\S+(?:\s+\S+){0,4})\s+gmbh\s*&\s*co\.?\s*kg": ("GmbH & Co. KG", "gmbh_co_kg"),
+ }
+ text_lower = text.lower()
+ for pattern, (form_label, form_id) in legal_forms.items():
+ m = re.search(pattern, text_lower)
+ if m:
+ raw_name = m.group(0).strip()
+ # Clean up: take from uppercase start
+ for i, ch in enumerate(text[m.start():m.end()]):
+ if ch.isupper():
+ cp["companyName"] = text[m.start() + i:m.end()].strip()
+ break
+ cp["legalForm"] = form_id
+ break
+
+ # PLZ + Ort
+ plz_match = re.search(
+ r"[d\-]?\s*(\d{5})\s+([A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+(?:\s+[a-z]+)*)",
+ text,
+ )
+ if plz_match:
+ cp["headquartersZip"] = plz_match.group(1)
+ cp["headquartersCity"] = plz_match.group(2).strip()
+ cp["headquartersCountry"] = "DE"
+
+ # Strasse
+ street_match = re.search(
+ r"([A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+(?:str(?:\.|asse)?|weg|allee|platz|ring|gasse)"
+ r"\s*\.?\s*\d+[a-z]?)",
+ text,
+ )
+ if street_match:
+ cp["headquartersStreet"] = street_match.group(1).strip()
+
+ # USt-IdNr
+ ust_match = re.search(r"DE\s*\d{9}", text)
+ if ust_match:
+ cp["ustIdNr"] = ust_match.group(0).replace(" ", "")
+
+ # HRB/HRA
+ hrb_match = re.search(r"HRB?\s*\d+", text, re.IGNORECASE)
+ if hrb_match:
+ cp["registrationNumber"] = hrb_match.group(0)
+
+ # Registergericht
+ reg_match = re.search(
+ r"(?:amtsgericht|registergericht|ag)\s+([A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+)",
+ text, re.IGNORECASE,
+ )
+ if reg_match:
+ cp["registrationCourt"] = reg_match.group(0)
+
+
+def _extract_dpo(text: str, result: dict) -> None:
+ """Extract DPO name and email."""
+ cp = result["company_profile"]
+
+ # DPO email
+ dpo_section = re.search(
+ r"datenschutzbeauftragte[rn]?\s*[\s\S]{0,300}",
+ text, re.IGNORECASE,
+ )
+ if dpo_section:
+ section = dpo_section.group(0)
+ email_match = re.search(r"[\w.+-]+@[\w-]+\.[\w.-]+", section)
+ if email_match:
+ cp["dpoEmail"] = email_match.group(0)
+
+ # DPO name (after "Datenschutzbeauftragter:" or similar)
+ name_match = re.search(
+ r"(?:datenschutzbeauftragte[rn]?\s*:?\s*)"
+ r"([A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+\s+"
+ r"[A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+)",
+ text,
+ )
+ if name_match:
+ cp["dpoName"] = name_match.group(1)
+
+
+def _extract_scope_hints(text: str, result: dict) -> None:
+ """Extract scope-relevant signals from document text."""
+ hints = result["compliance_scope_hints"]
+
+ # Sensitive data categories (Art. 9)
+ if any(kw in text for kw in [
+ "gesundheitsdaten", "biometrisch", "genetisch",
+ "religionszugehoerigkeit", "gewerkschaft", "sexualleben",
+ "politische meinung", "ethnische herkunft",
+ ]):
+ hints.append({
+ "field": "processesSpecialCategories",
+ "value": True,
+ "source": "Erwaehnung besonderer Datenkategorien (Art. 9 DSGVO) im Text",
+ })
+
+ # Third country transfer
+ if any(kw in text for kw in ["usa", "drittland", "drittstaaten", "third country"]):
+ hints.append({
+ "field": "hasThirdCountryTransfer",
+ "value": True,
+ "source": "Drittlandtransfer erwaehnt",
+ })
+
+ # Large-scale processing
+ if any(kw in text for kw in [
+ "umfangreiche verarbeitung", "grosse anzahl",
+ "large scale", "massenverarbeitung",
+ ]):
+ hints.append({
+ "field": "largeScaleProcessing",
+ "value": True,
+ "source": "Hinweis auf umfangreiche Verarbeitung",
+ })
+
+ # Automated decision-making
+ if any(kw in text for kw in [
+ "automatisierte entscheidung", "profiling", "scoring",
+ "automated decision", "art. 22",
+ ]):
+ hints.append({
+ "field": "automatedDecisionMaking",
+ "value": True,
+ "source": "Automatisierte Entscheidungsfindung erwaehnt",
+ })
+
+ # Auftragsverarbeitung (processor role)
+ if any(kw in text for kw in [
+ "auftragsverarbeitung", "auftragsverarbeiter",
+ "im auftrag", "weisungsgebunden",
+ ]):
+ hints.append({
+ "field": "isDataProcessor",
+ "value": True,
+ "source": "Auftragsverarbeitung erwaehnt",
+ })
+
+ # Newsletter / Marketing
+ if any(kw in text for kw in ["newsletter", "marketing", "werbung"]):
+ hints.append({
+ "field": "hasNewsletter",
+ "value": True,
+ "source": "Newsletter/Marketing erwaehnt",
+ })
+
+ # Employee data
+ if any(kw in text for kw in [
+ "mitarbeiterdaten", "beschaeftigtendaten", "personalakte",
+ "bewerberdaten", "arbeitnehmer",
+ ]):
+ hints.append({
+ "field": "processesEmployeeData",
+ "value": True,
+ "source": "Beschaeftigtendaten-Verarbeitung erwaehnt",
+ })