diff --git a/backend-compliance/compliance/services/impressum_multi_entity_check.py b/backend-compliance/compliance/services/impressum_multi_entity_check.py index cce74770..75e254e2 100644 --- a/backend-compliance/compliance/services/impressum_multi_entity_check.py +++ b/backend-compliance/compliance/services/impressum_multi_entity_check.py @@ -34,36 +34,133 @@ _NAME_NOISE_PAT = re.compile( re.IGNORECASE, ) -_USTID_PAT = re.compile(r"\b(?:USt-?Id(?:Nr)?\.?|VAT(?:-?Id)?)\s*[:.\s]\s*" - r"(DE\d{8,10}|[A-Z]{2}\d{6,12})", re.IGNORECASE) +# HRB-/HRA-Eintrag pro Entity. Dies ist der stärkste Anker: jede +# juristische Person muss in Deutschland einen eindeutigen +# HRB/HRA-Eintrag haben. Mehrere HRB-Vorkommen = mehrere Entities. +_HRB_PAT = re.compile( + r"HR[BA]\s*(?:Nr\.?|Nummer)?\s*\d+(?:\s*[A-Z])?", + re.IGNORECASE, +) + +# Worte, die NIE ein gültiger Firmenname sind. Filtern False-Positives +# wie "Programmierung der Webseite Elli Mobility GmbH" oder +# "Umsatzsteueridentifikationsnummer der Elli Mobility GmbH". +_NAME_BLOCKLIST = ( + "programmierung", "webseite", "umsatzsteueridentifik", + "schlichtungsstelle", "auftragsverarbeitung", "haftung", + "verantwortlich", "diensteanbieter", "geschäftsführer", + "geschaeftsfuehrer", "vorstand", "gesellschaftsregister", + "registergericht", "registriert", +) + +_USTID_PAT = re.compile( + # Form A: Abkürzung mit Separator (USt-IdNr.: DE…) + r"(?:" + r"\b(?:USt-?Id(?:Nr)?\.?|VAT(?:-?Id)?)\s*[:.\s]\s*" + r"(DE\d{8,10}|[A-Z]{2}\d{6,12})" + r"|" + # Form B: Vollform mit Bridge (Umsatzsteueridentifikationsnummer + # der Elli Mobility GmbH ist DE…). Max 80 Zeichen Bridge, + # \n erlaubt (Cap schützt vor Cross-Paragraph-Drift). + r"\bUmsatzsteuer[\s-]?Id(?:entifikationsnummer)?\b[\s\S]{0,80}?" + r"(DE\d{8,10}|[A-Z]{2}\d{6,12})" + r")", + re.IGNORECASE, +) _HR_PAT = re.compile(r"\b(?:HR[BA]|Handelsregister|Registergericht)" r"\s*[:.\s]*([\w\s\d\-/]{4,80})", re.IGNORECASE) _GF_PAT = re.compile(r"(?:Geschäftsführer|Vertretungsberechtigt|" r"vertreten\s+durch)\s*[:.\s]+", re.IGNORECASE) +_LEADING_NOISE_RE = re.compile( + r"^(?:eco|com|de|net|" # email/domain TLD-Artefakte + r"Die|Der|Das|" # deutsche Artikel + r"www\.[\w\.-]+|" # URL-Reste + r"@[\w\.-]+)\s+", + re.IGNORECASE, +) + + def _clean_entity_name(raw: str) -> str: - """Strip leading header noise + collapse whitespace.""" + """Strip leading header noise, leading artifacts + collapse whitespace.""" name = raw.strip() - # If the match spans multiple lines (regex captured a header before - # the actual company name), keep only the last line. + # If the match spans multiple lines, keep only the last line. if "\n" in name: name = name.rsplit("\n", 1)[-1].strip() name = _NAME_NOISE_PAT.sub("", name).strip() - return re.sub(r"\s+", " ", name) + # Strip leading email-TLD-artifacts ("eco "), German articles + # ("Die ", "Der "), URL-fragments etc. — iterate until stable. + for _ in range(3): + new = _LEADING_NOISE_RE.sub("", name) + if new == name: + break + name = new + return re.sub(r"\s+", " ", name).strip() + + +def _name_is_blocked(name: str) -> bool: + nl = name.lower() + return any(b in nl for b in _NAME_BLOCKLIST) def _slice_entities(text: str) -> list[tuple[str, str]]: - """Return [(entity_name, text_slice)] for each detected entity.""" + """Return [(entity_name, text_slice)] for each detected entity. + + Anker-Strategie: jeder eigenständige HRB/HRA-Eintrag markiert eine + Entity. Pro HRB-Block laufen wir RÜCKWÄRTS bis zum vorherigen + Block-Ende und nehmen den letzten Legal-Form-Match (GmbH/AG/...) + in diesem Fenster als Firmennamen. + + Falls keine HRBs vorhanden sind (z.B. e.V. ohne HR-Pflicht), + fallen wir auf den alten Legal-Form-Anker zurück. + """ + hrb_matches = list(_HRB_PAT.finditer(text)) + if len(hrb_matches) >= 2: + slices: list[tuple[str, str]] = [] + seen_names: set[str] = set() + for i, h in enumerate(hrb_matches): + # Window: from end of previous HRB (or start) to this HRB. + win_start = hrb_matches[i - 1].end() if i > 0 else 0 + win_end = h.start() + window = text[win_start:win_end] + # Find the LAST legal-form-match in the window — that's + # the entity-name closest to this HRB-marker. + legal_matches = list(_ENTITY_PAT.finditer(window)) + if not legal_matches: + continue + name_raw = legal_matches[-1].group(1) + name = _clean_entity_name(name_raw) + if not name or _name_is_blocked(name): + continue + if name in seen_names: + continue + seen_names.add(name) + # Block-Slice: from the matched name to the END of this + # HRB-block (next HRB-start or EOF). + slice_start = win_start + legal_matches[-1].start() + slice_end = (hrb_matches[i + 1].start() + if i + 1 < len(hrb_matches) else len(text)) + slices.append((name, text[slice_start:slice_end])) + if len(slices) >= 2: + return slices + + # Fallback (no HRBs / nur einer / parsing scheiterte): + # alter Legal-Form-Anker mit Blocklist-Filter + dedup. matches = list(_ENTITY_PAT.finditer(text)) if len(matches) < 2: return [] - slices: list[tuple[str, str]] = [] + slices = [] + seen_names = set() for i, m in enumerate(matches): + name = _clean_entity_name(m.group(1)) + if not name or _name_is_blocked(name) or name in seen_names: + continue + seen_names.add(name) start = m.start() end = matches[i + 1].start() if i + 1 < len(matches) else len(text) - slices.append((_clean_entity_name(m.group(1)), text[start:end])) - return slices + slices.append((name, text[start:end])) + return slices if len(slices) >= 2 else [] def check_multi_entity_impressum(state: dict) -> list[dict]: diff --git a/backend-compliance/compliance/services/retention_conflict_check.py b/backend-compliance/compliance/services/retention_conflict_check.py index 2e603396..07d7e1e0 100644 --- a/backend-compliance/compliance/services/retention_conflict_check.py +++ b/backend-compliance/compliance/services/retention_conflict_check.py @@ -37,6 +37,8 @@ _CATEGORIES: list[tuple[str, tuple[str, ...]]] = [ "access-log", "access log", "zugriffslog", "webserver-log", "webserver log", "webserver-protokoll", "server-protokoll", + "protokolldat", "protokollierung der zugriffe", + "zugriffsdat", "zugriffsprotokoll", "ip-adressen werden gespeichert", "ip-adresse wird gespeichert", )), ("contact_form", ( diff --git a/backend-compliance/tests/test_impressum_multi_entity_check.py b/backend-compliance/tests/test_impressum_multi_entity_check.py index 115c0c98..f6d9d91b 100644 --- a/backend-compliance/tests/test_impressum_multi_entity_check.py +++ b/backend-compliance/tests/test_impressum_multi_entity_check.py @@ -33,6 +33,61 @@ E-Mail: ellimobility@elli.eco """ +_ELLI_REAL_WORLD = """ +eco Volkswagen Group Charging GmbH ist im Handelsregister des +Amtsgerichtes Charlottenburg unter der Nummer HRB 208967 B eingetragen. +Verantwortlich für den Inhalt nach § 55 Abs. 2 RStV: Giovanni Palazzo. + +eco Die Elli Mobility GmbH ist im Handelsregister des Amtsgerichtes +Charlottenburg unter der Nummer HRB 274616 B eingetragen. Die +Umsatzsteueridentifikationsnummer der Elli Mobility GmbH ist +DE814424009. Postanschrift: Karl-Liebknecht-Str. 32. +Geschäftsführer: Joschi Jennermann Sebastian Steffen. +""" + + +class TestRealWorldElliPattern: + """Regression: Elli's reale HTML→Text-extrahierte Form mit + leading-noise-Artefakten (eco/Die), HRB-Boundary, und USt-IdNr- + Vollform statt Abkürzung.""" + + def test_slice_finds_two_clean_entities(self): + slices = _slice_entities(_ELLI_REAL_WORLD) + names = [n for n, _ in slices] + assert names == [ + "Volkswagen Group Charging GmbH", + "Elli Mobility GmbH", + ] + + def test_ust_id_long_form_detected(self): + # "Umsatzsteueridentifikationsnummer der ... ist DE..." + findings = check_multi_entity_impressum( + {"doc_texts": {"impressum": _ELLI_REAL_WORLD}} + ) + ust = [f for f in findings if f["check_id"] == "IMPRESSUM-MULTI-UST_ID"] + assert len(ust) == 1 + assert ust[0]["entities_missing"] == [ + "Volkswagen Group Charging GmbH", + ] + assert ust[0]["entities_present"] == ["Elli Mobility GmbH"] + + def test_blocklist_filters_false_positives(self): + # "Programmierung der Webseite Elli Mobility GmbH" was an + # over-match before the blocklist. With it, the only "entity" + # candidates per HRB-block are the actual GmbHs. + noisy = ( + "Acme GmbH HRB 1000 B Berlin.\n" + "Foo Holding GmbH HRB 2000 B München.\n" + "Programmierung der Webseite Acme GmbH.\n" + "Umsatzsteueridentifikationsnummer der Foo Holding GmbH " + "ist DE111111111." + ) + slices = _slice_entities(noisy) + # Both real entities, no false positives. + names = sorted(n for n, _ in slices) + assert names == ["Acme GmbH", "Foo Holding GmbH"] + + class TestCleanEntityName: def test_strips_header_prefix(self): assert _clean_entity_name(