feat(compliance-check): profile extraction + scenario classification
Build + Deploy / build-admin-compliance (push) Successful in 15s
Build + Deploy / build-backend-compliance (push) Successful in 21s
Build + Deploy / build-ai-sdk (push) Successful in 46s
Build + Deploy / build-developer-portal (push) Successful in 12s
Build + Deploy / build-tts (push) Successful in 13s
Build + Deploy / build-document-crawler (push) Successful in 11s
Build + Deploy / build-dsms-gateway (push) Successful in 11s
Build + Deploy / build-dsms-node (push) Successful in 14s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / loc-budget (push) Failing after 17s
CI / secret-scan (push) Has been skipped
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 2m46s
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / test-go (push) Successful in 47s
CI / test-python-backend (push) Successful in 39s
CI / test-python-document-crawler (push) Successful in 27s
CI / test-python-dsms-gateway (push) Successful in 22s
CI / validate-canonical-controls (push) Successful in 16s
Build + Deploy / trigger-orca (push) Successful in 2m29s
Build + Deploy / build-admin-compliance (push) Successful in 15s
Build + Deploy / build-backend-compliance (push) Successful in 21s
Build + Deploy / build-ai-sdk (push) Successful in 46s
Build + Deploy / build-developer-portal (push) Successful in 12s
Build + Deploy / build-tts (push) Successful in 13s
Build + Deploy / build-document-crawler (push) Successful in 11s
Build + Deploy / build-dsms-gateway (push) Successful in 11s
Build + Deploy / build-dsms-node (push) Successful in 14s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / loc-budget (push) Failing after 17s
CI / secret-scan (push) Has been skipped
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 2m46s
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / test-go (push) Successful in 47s
CI / test-python-backend (push) Successful in 39s
CI / test-python-document-crawler (push) Successful in 27s
CI / test-python-dsms-gateway (push) Successful in 22s
CI / validate-canonical-controls (push) Successful in 16s
Build + Deploy / trigger-orca (push) Successful in 2m29s
- New profile_extractor.py: extracts Company Profile fields (name, legal form, address, DPO, USt-IdNr) and Compliance Scope hints (Art. 9 data, third country, profiling) from document texts - Scenario per document: regenerate (<30%), fix (30-95%), import (>95%) - Widerruf for B2B: no longer skipped, instead all checks flagged as INFO with "not needed for B2B" hint - Move _build_profile_html to report builder module - DocCheckResult gets scenario field Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -268,15 +268,29 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
|
||||
l2p = sum(1 for c in l2 if c.passed)
|
||||
r.correctness_pct = round(l2p / len(l2) * 100) if l2 else 0
|
||||
|
||||
# Step 4: Build report
|
||||
# Step 4: Extract profile hints from documents
|
||||
_update(check_id, "Profil wird aus Dokumenten extrahiert...")
|
||||
from compliance.services.profile_extractor import extract_profile_from_documents
|
||||
extracted_profile = extract_profile_from_documents(doc_texts, profile_dict)
|
||||
|
||||
# Step 4b: Determine scenario per document
|
||||
for r in results:
|
||||
if r.error:
|
||||
r.scenario = "skip"
|
||||
elif r.completeness_pct < 30:
|
||||
r.scenario = "regenerate"
|
||||
elif r.completeness_pct < 95:
|
||||
r.scenario = "fix"
|
||||
else:
|
||||
r.scenario = "import"
|
||||
|
||||
# Step 5: Build report
|
||||
_update(check_id, "Report wird erstellt...")
|
||||
report_html = build_html_report(results, None)
|
||||
|
||||
# Prepend profile summary to report
|
||||
profile_html = _build_profile_html(profile)
|
||||
full_html = profile_html + report_html
|
||||
|
||||
# Step 5: Send email
|
||||
# Step 6: Send email
|
||||
doc_count = len([r for r in results if not r.error])
|
||||
email_result = send_email(
|
||||
recipient=req.recipient,
|
||||
@@ -284,10 +298,11 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
|
||||
body_html=full_html,
|
||||
)
|
||||
|
||||
# Step 6: Store result
|
||||
# Step 7: Store result
|
||||
response = {
|
||||
"results": [_result_to_dict(r) for r in results],
|
||||
"business_profile": profile_dict,
|
||||
"extracted_profile": extracted_profile,
|
||||
"banner_result": {
|
||||
"detected": banner_result.get("banner_detected", False) if banner_result else False,
|
||||
"provider": banner_result.get("banner_provider", "") if banner_result else "",
|
||||
@@ -406,16 +421,9 @@ async def _check_single(
|
||||
|
||||
|
||||
def _get_skip_types(profile) -> dict[str, str]:
|
||||
"""Return doc_types to skip entirely based on business profile.
|
||||
|
||||
Returns dict mapping doc_type -> skip reason.
|
||||
"""
|
||||
skip: dict[str, str] = {}
|
||||
if profile.business_type in ("b2b", "b2g"):
|
||||
skip["widerruf"] = "Uebersprungen: Widerrufsbelehrung nur fuer B2C relevant"
|
||||
if profile.business_type in ("b2b", "b2g") and not profile.has_online_shop:
|
||||
skip["nutzungsbedingungen"] = "Uebersprungen: Nutzungsbedingungen bei B2B ohne Shop selten relevant"
|
||||
return skip
|
||||
"""Doc_types to skip entirely. Currently empty — we check everything
|
||||
and flag irrelevant items as INFO instead of skipping."""
|
||||
return {}
|
||||
|
||||
|
||||
def _apply_profile_filter(result, profile, doc_type: str):
|
||||
@@ -434,10 +442,16 @@ def _apply_profile_filter(result, profile, doc_type: str):
|
||||
check.skipped = True
|
||||
check.hint = "Nicht relevant (kein B2C Online-Shop)"
|
||||
|
||||
# Widerruf only relevant for B2C
|
||||
# Widerruf: Flag entire document as unnecessary for B2B
|
||||
if doc_type == "widerruf" and profile.business_type not in ("b2c", "unknown"):
|
||||
if check.severity == "INFO":
|
||||
check.skipped = True
|
||||
check.severity = "INFO"
|
||||
if not check.passed:
|
||||
check.hint = (
|
||||
"Als B2B-Unternehmen benoetigen Sie keine Widerrufsbelehrung "
|
||||
"(§355 BGB gilt nur fuer Verbrauchervertraege). "
|
||||
"Empfehlung: Entfernen Sie die Widerrufsbelehrung von "
|
||||
"Ihrer Website, da sie Verwirrung stiften kann."
|
||||
)
|
||||
|
||||
# Regulated profession: check for Kammer info
|
||||
if "kammer" in cid or "berufsordnung" in check.label.lower():
|
||||
@@ -479,41 +493,13 @@ def _result_to_dict(r) -> dict:
|
||||
"correctness_pct": r.correctness_pct,
|
||||
"checks": [{f: getattr(c, f) for f in fields} for c in r.checks],
|
||||
"findings_count": r.findings_count, "error": r.error,
|
||||
"scenario": getattr(r, "scenario", ""),
|
||||
}
|
||||
|
||||
|
||||
def _build_profile_html(profile) -> str:
|
||||
"""Build a small HTML block summarizing the detected business profile."""
|
||||
service_tags = ", ".join(profile.detected_services[:10]) or "keine erkannt"
|
||||
flags = []
|
||||
if profile.has_online_shop:
|
||||
flags.append("Online-Shop")
|
||||
if profile.has_editorial_content:
|
||||
flags.append("Redaktionelle Inhalte")
|
||||
if profile.is_regulated_profession:
|
||||
flags.append(f"Regulierter Beruf ({profile.regulated_profession_type})")
|
||||
if profile.needs_odr:
|
||||
flags.append("ODR-pflichtig")
|
||||
flags_str = ", ".join(flags) or "keine"
|
||||
|
||||
return (
|
||||
'<div style="font-family:-apple-system,BlinkMacSystemFont,sans-serif;'
|
||||
'max-width:700px;margin:0 auto 16px;padding:12px 16px;'
|
||||
'background:#f0f9ff;border:1px solid #bae6fd;border-radius:8px">'
|
||||
'<h3 style="margin:0 0 8px;font-size:14px;color:#0369a1">'
|
||||
'Erkanntes Geschaeftsmodell</h3>'
|
||||
'<table style="font-size:13px;color:#374151">'
|
||||
f'<tr><td style="padding:2px 12px 2px 0;color:#6b7280">Typ:</td>'
|
||||
f'<td><strong>{profile.business_type.upper()}</strong>'
|
||||
f' ({profile.industry})</td></tr>'
|
||||
f'<tr><td style="padding:2px 12px 2px 0;color:#6b7280">Merkmale:</td>'
|
||||
f'<td>{flags_str}</td></tr>'
|
||||
f'<tr><td style="padding:2px 12px 2px 0;color:#6b7280">Dienste:</td>'
|
||||
f'<td>{service_tags}</td></tr>'
|
||||
f'<tr><td style="padding:2px 12px 2px 0;color:#6b7280">Konfidenz:</td>'
|
||||
f'<td>{int(profile.confidence * 100)}%</td></tr>'
|
||||
'</table></div>'
|
||||
)
|
||||
from .agent_doc_check_report import build_profile_html
|
||||
return build_profile_html(profile)
|
||||
|
||||
|
||||
# Cross-check extracted to compliance.services.banner_cookie_cross_check
|
||||
|
||||
@@ -173,3 +173,37 @@ def _render_cookie_banner(html: list[str], cookie_result: dict) -> None:
|
||||
else:
|
||||
html.append('<br><span style="color:#22c55e">Keine Verstoesse erkannt.</span>')
|
||||
html.append('</div>')
|
||||
|
||||
|
||||
def build_profile_html(profile) -> str:
|
||||
"""Build a small HTML block summarizing the detected business profile."""
|
||||
service_tags = ", ".join(profile.detected_services[:10]) or "keine erkannt"
|
||||
flags = []
|
||||
if profile.has_online_shop:
|
||||
flags.append("Online-Shop")
|
||||
if profile.has_editorial_content:
|
||||
flags.append("Redaktionelle Inhalte")
|
||||
if profile.is_regulated_profession:
|
||||
flags.append(f"Regulierter Beruf ({profile.regulated_profession_type})")
|
||||
if profile.needs_odr:
|
||||
flags.append("ODR-pflichtig")
|
||||
flags_str = ", ".join(flags) or "keine"
|
||||
|
||||
return (
|
||||
'<div style="font-family:-apple-system,BlinkMacSystemFont,sans-serif;'
|
||||
'max-width:700px;margin:0 auto 16px;padding:12px 16px;'
|
||||
'background:#f0f9ff;border:1px solid #bae6fd;border-radius:8px">'
|
||||
'<h3 style="margin:0 0 8px;font-size:14px;color:#0369a1">'
|
||||
'Erkanntes Geschaeftsmodell</h3>'
|
||||
'<table style="font-size:13px;color:#374151">'
|
||||
f'<tr><td style="padding:2px 12px 2px 0;color:#6b7280">Typ:</td>'
|
||||
f'<td><strong>{profile.business_type.upper()}</strong>'
|
||||
f' ({profile.industry})</td></tr>'
|
||||
f'<tr><td style="padding:2px 12px 2px 0;color:#6b7280">Merkmale:</td>'
|
||||
f'<td>{flags_str}</td></tr>'
|
||||
f'<tr><td style="padding:2px 12px 2px 0;color:#6b7280">Dienste:</td>'
|
||||
f'<td>{service_tags}</td></tr>'
|
||||
f'<tr><td style="padding:2px 12px 2px 0;color:#6b7280">Konfidenz:</td>'
|
||||
f'<td>{int(profile.confidence * 100)}%</td></tr>'
|
||||
'</table></div>'
|
||||
)
|
||||
|
||||
@@ -65,6 +65,7 @@ class DocCheckResult(BaseModel):
|
||||
checks: list[CheckItem] = []
|
||||
findings_count: int = 0
|
||||
error: str = ""
|
||||
scenario: str = "" # regenerate | fix | import | skip
|
||||
|
||||
|
||||
class DocCheckResponse(BaseModel):
|
||||
|
||||
@@ -0,0 +1,248 @@
|
||||
"""
|
||||
Profile Extractor — pre-fill Company Profile + Compliance Scope from documents.
|
||||
|
||||
When a customer uploads their existing legal documents, we extract
|
||||
what we can and pre-fill the profile/scope wizard so they only need
|
||||
to confirm and fill gaps.
|
||||
|
||||
Returns a dict that maps to CompanyProfile and ScopeProfilingAnswer fields.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def extract_profile_from_documents(
|
||||
doc_texts: dict[str, str],
|
||||
business_profile: dict | None = None,
|
||||
) -> dict:
|
||||
"""Extract Company Profile fields from document texts.
|
||||
|
||||
Args:
|
||||
doc_texts: dict mapping doc_type -> text
|
||||
business_profile: optional detected business profile from profiler
|
||||
|
||||
Returns dict with pre-filled fields for Company Profile and Scope.
|
||||
"""
|
||||
result: dict = {
|
||||
"company_profile": {},
|
||||
"compliance_scope_hints": [],
|
||||
"extracted_from": [],
|
||||
}
|
||||
|
||||
all_text = "\n".join(doc_texts.values()).lower()
|
||||
all_text_original = "\n".join(doc_texts.values())
|
||||
|
||||
# ── Company name + legal form ────────────────────────────────
|
||||
impressum = doc_texts.get("impressum", "")
|
||||
if impressum:
|
||||
_extract_company_info(impressum, result)
|
||||
result["extracted_from"].append("impressum")
|
||||
|
||||
# Fallback: try DSI
|
||||
if not result["company_profile"].get("companyName") and "dse" in doc_texts:
|
||||
_extract_company_info(doc_texts["dse"], result)
|
||||
result["extracted_from"].append("dse")
|
||||
|
||||
# ── DPO contact ──────────────────────────────────────────────
|
||||
_extract_dpo(all_text_original, result)
|
||||
|
||||
# ── Business model from profiler ─────────────────────────────
|
||||
if business_profile:
|
||||
bp = business_profile
|
||||
if bp.get("business_type") and bp["business_type"] != "unknown":
|
||||
result["company_profile"]["businessModel"] = bp["business_type"]
|
||||
if bp.get("industry") and bp["industry"] != "unknown":
|
||||
result["company_profile"]["industry"] = [bp["industry"]]
|
||||
if bp.get("has_online_shop"):
|
||||
result["company_profile"]["offerings"] = ["online_shop"]
|
||||
if bp.get("is_regulated_profession"):
|
||||
result["company_profile"]["regulatedProfession"] = True
|
||||
result["company_profile"]["regulatedProfessionType"] = bp.get(
|
||||
"regulated_profession_type", ""
|
||||
)
|
||||
|
||||
# ── Scope hints from document content ────────────────────────
|
||||
_extract_scope_hints(all_text, result)
|
||||
|
||||
# ── Tracking services → data processing activities ───────────
|
||||
if business_profile and business_profile.get("detected_services"):
|
||||
result["detected_services"] = business_profile["detected_services"]
|
||||
|
||||
logger.info(
|
||||
"Extracted %d profile fields, %d scope hints from %d documents",
|
||||
len(result["company_profile"]),
|
||||
len(result["compliance_scope_hints"]),
|
||||
len(doc_texts),
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
def _extract_company_info(text: str, result: dict) -> None:
|
||||
"""Extract company name, legal form, address from text."""
|
||||
cp = result["company_profile"]
|
||||
|
||||
# GmbH / AG / UG / e.K. etc.
|
||||
legal_forms = {
|
||||
r"(\S+(?:\s+\S+){0,4})\s+gmbh\b": ("GmbH", "gmbh"),
|
||||
r"(\S+(?:\s+\S+){0,4})\s+ag\b": ("AG", "ag"),
|
||||
r"(\S+(?:\s+\S+){0,4})\s+ug\b": ("UG", "ug"),
|
||||
r"(\S+(?:\s+\S+){0,4})\s+e\.?\s*k\.?\b": ("e.K.", "ek"),
|
||||
r"(\S+(?:\s+\S+){0,4})\s+gbr\b": ("GbR", "gbr"),
|
||||
r"(\S+(?:\s+\S+){0,4})\s+ohg\b": ("OHG", "ohg"),
|
||||
r"(\S+(?:\s+\S+){0,4})\s+gmbh\s*&\s*co\.?\s*kg": ("GmbH & Co. KG", "gmbh_co_kg"),
|
||||
}
|
||||
text_lower = text.lower()
|
||||
for pattern, (form_label, form_id) in legal_forms.items():
|
||||
m = re.search(pattern, text_lower)
|
||||
if m:
|
||||
raw_name = m.group(0).strip()
|
||||
# Clean up: take from uppercase start
|
||||
for i, ch in enumerate(text[m.start():m.end()]):
|
||||
if ch.isupper():
|
||||
cp["companyName"] = text[m.start() + i:m.end()].strip()
|
||||
break
|
||||
cp["legalForm"] = form_id
|
||||
break
|
||||
|
||||
# PLZ + Ort
|
||||
plz_match = re.search(
|
||||
r"[d\-]?\s*(\d{5})\s+([A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+(?:\s+[a-z]+)*)",
|
||||
text,
|
||||
)
|
||||
if plz_match:
|
||||
cp["headquartersZip"] = plz_match.group(1)
|
||||
cp["headquartersCity"] = plz_match.group(2).strip()
|
||||
cp["headquartersCountry"] = "DE"
|
||||
|
||||
# Strasse
|
||||
street_match = re.search(
|
||||
r"([A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+(?:str(?:\.|asse)?|weg|allee|platz|ring|gasse)"
|
||||
r"\s*\.?\s*\d+[a-z]?)",
|
||||
text,
|
||||
)
|
||||
if street_match:
|
||||
cp["headquartersStreet"] = street_match.group(1).strip()
|
||||
|
||||
# USt-IdNr
|
||||
ust_match = re.search(r"DE\s*\d{9}", text)
|
||||
if ust_match:
|
||||
cp["ustIdNr"] = ust_match.group(0).replace(" ", "")
|
||||
|
||||
# HRB/HRA
|
||||
hrb_match = re.search(r"HRB?\s*\d+", text, re.IGNORECASE)
|
||||
if hrb_match:
|
||||
cp["registrationNumber"] = hrb_match.group(0)
|
||||
|
||||
# Registergericht
|
||||
reg_match = re.search(
|
||||
r"(?:amtsgericht|registergericht|ag)\s+([A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+)",
|
||||
text, re.IGNORECASE,
|
||||
)
|
||||
if reg_match:
|
||||
cp["registrationCourt"] = reg_match.group(0)
|
||||
|
||||
|
||||
def _extract_dpo(text: str, result: dict) -> None:
|
||||
"""Extract DPO name and email."""
|
||||
cp = result["company_profile"]
|
||||
|
||||
# DPO email
|
||||
dpo_section = re.search(
|
||||
r"datenschutzbeauftragte[rn]?\s*[\s\S]{0,300}",
|
||||
text, re.IGNORECASE,
|
||||
)
|
||||
if dpo_section:
|
||||
section = dpo_section.group(0)
|
||||
email_match = re.search(r"[\w.+-]+@[\w-]+\.[\w.-]+", section)
|
||||
if email_match:
|
||||
cp["dpoEmail"] = email_match.group(0)
|
||||
|
||||
# DPO name (after "Datenschutzbeauftragter:" or similar)
|
||||
name_match = re.search(
|
||||
r"(?:datenschutzbeauftragte[rn]?\s*:?\s*)"
|
||||
r"([A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+\s+"
|
||||
r"[A-Z\u00c0-\u017e][a-z\u00e0-\u00ff]+)",
|
||||
text,
|
||||
)
|
||||
if name_match:
|
||||
cp["dpoName"] = name_match.group(1)
|
||||
|
||||
|
||||
def _extract_scope_hints(text: str, result: dict) -> None:
|
||||
"""Extract scope-relevant signals from document text."""
|
||||
hints = result["compliance_scope_hints"]
|
||||
|
||||
# Sensitive data categories (Art. 9)
|
||||
if any(kw in text for kw in [
|
||||
"gesundheitsdaten", "biometrisch", "genetisch",
|
||||
"religionszugehoerigkeit", "gewerkschaft", "sexualleben",
|
||||
"politische meinung", "ethnische herkunft",
|
||||
]):
|
||||
hints.append({
|
||||
"field": "processesSpecialCategories",
|
||||
"value": True,
|
||||
"source": "Erwaehnung besonderer Datenkategorien (Art. 9 DSGVO) im Text",
|
||||
})
|
||||
|
||||
# Third country transfer
|
||||
if any(kw in text for kw in ["usa", "drittland", "drittstaaten", "third country"]):
|
||||
hints.append({
|
||||
"field": "hasThirdCountryTransfer",
|
||||
"value": True,
|
||||
"source": "Drittlandtransfer erwaehnt",
|
||||
})
|
||||
|
||||
# Large-scale processing
|
||||
if any(kw in text for kw in [
|
||||
"umfangreiche verarbeitung", "grosse anzahl",
|
||||
"large scale", "massenverarbeitung",
|
||||
]):
|
||||
hints.append({
|
||||
"field": "largeScaleProcessing",
|
||||
"value": True,
|
||||
"source": "Hinweis auf umfangreiche Verarbeitung",
|
||||
})
|
||||
|
||||
# Automated decision-making
|
||||
if any(kw in text for kw in [
|
||||
"automatisierte entscheidung", "profiling", "scoring",
|
||||
"automated decision", "art. 22",
|
||||
]):
|
||||
hints.append({
|
||||
"field": "automatedDecisionMaking",
|
||||
"value": True,
|
||||
"source": "Automatisierte Entscheidungsfindung erwaehnt",
|
||||
})
|
||||
|
||||
# Auftragsverarbeitung (processor role)
|
||||
if any(kw in text for kw in [
|
||||
"auftragsverarbeitung", "auftragsverarbeiter",
|
||||
"im auftrag", "weisungsgebunden",
|
||||
]):
|
||||
hints.append({
|
||||
"field": "isDataProcessor",
|
||||
"value": True,
|
||||
"source": "Auftragsverarbeitung erwaehnt",
|
||||
})
|
||||
|
||||
# Newsletter / Marketing
|
||||
if any(kw in text for kw in ["newsletter", "marketing", "werbung"]):
|
||||
hints.append({
|
||||
"field": "hasNewsletter",
|
||||
"value": True,
|
||||
"source": "Newsletter/Marketing erwaehnt",
|
||||
})
|
||||
|
||||
# Employee data
|
||||
if any(kw in text for kw in [
|
||||
"mitarbeiterdaten", "beschaeftigtendaten", "personalakte",
|
||||
"bewerberdaten", "arbeitnehmer",
|
||||
]):
|
||||
hints.append({
|
||||
"field": "processesEmployeeData",
|
||||
"value": True,
|
||||
"source": "Beschaeftigtendaten-Verarbeitung erwaehnt",
|
||||
})
|
||||
Reference in New Issue
Block a user