""" Deduplizierung für Alerts. Nutzt SimHash für Fuzzy-Matching von ähnlichen Texten. SimHash ist ein Locality-Sensitive Hash, bei dem ähnliche Texte ähnliche Hashes produzieren. """ import hashlib import re from typing import Optional from collections import Counter # SimHash Parameter SIMHASH_BITS = 64 SHINGLE_SIZE = 3 # Anzahl aufeinanderfolgender Wörter def _tokenize(text: str) -> list: """ Tokenisiere Text in normalisierte Wörter. - Lowercase - Nur alphanumerische Zeichen - Stoppwörter entfernen (deutsche) """ # Deutsche Stoppwörter (häufige Wörter ohne semantischen Wert) STOPWORDS = { "der", "die", "das", "den", "dem", "des", "ein", "eine", "einer", "eines", "und", "oder", "aber", "doch", "wenn", "weil", "dass", "als", "auch", "ist", "sind", "war", "waren", "wird", "werden", "wurde", "wurden", "hat", "haben", "hatte", "hatten", "kann", "können", "konnte", "konnten", "für", "von", "mit", "bei", "nach", "aus", "über", "unter", "vor", "hinter", "auf", "an", "in", "im", "am", "um", "bis", "durch", "ohne", "gegen", "nicht", "noch", "nur", "schon", "sehr", "mehr", "sich", "es", "sie", "er", "wir", "ihr", "ich", "du", "man", "so", "wie", "was", "wer", "wo", "wann", } # Normalisiere text = text.lower() # Nur Buchstaben, Zahlen und Umlaute text = re.sub(r"[^a-zäöüß0-9\s]", " ", text) # Tokenisiere words = text.split() # Filtere Stoppwörter und kurze Wörter words = [w for w in words if w not in STOPWORDS and len(w) > 2] return words def _create_shingles(words: list, size: int = SHINGLE_SIZE) -> list: """ Erstelle Shingles (n-Gramme) aus Wortliste. Shingles sind überlappende Sequenzen von Wörtern. z.B. ["a", "b", "c", "d"] mit size=2 -> ["a b", "b c", "c d"] """ if len(words) < size: return [" ".join(words)] if words else [] return [" ".join(words[i:i+size]) for i in range(len(words) - size + 1)] def _hash_shingle(shingle: str) -> int: """Hash ein Shingle zu einer 64-bit Zahl.""" # Nutze MD5 und nimm erste 8 Bytes (64 bit) h = hashlib.md5(shingle.encode()).digest()[:8] return int.from_bytes(h, byteorder="big") def compute_simhash(text: str) -> str: """ Berechne SimHash eines Texts. SimHash funktioniert wie folgt: 1. Text in Shingles (Wort-n-Gramme) aufteilen 2. Jeden Shingle hashen 3. Für jeden Hash: Wenn Bit=1 -> +1, sonst -1 4. Summieren über alle Hashes 5. Wenn Summe > 0 -> Bit=1, sonst 0 Returns: 16-Zeichen Hex-String (64 bit) """ if not text: return "0" * 16 words = _tokenize(text) if not words: return "0" * 16 shingles = _create_shingles(words) if not shingles: return "0" * 16 # Bit-Vektoren initialisieren v = [0] * SIMHASH_BITS for shingle in shingles: h = _hash_shingle(shingle) for i in range(SIMHASH_BITS): bit = (h >> i) & 1 if bit: v[i] += 1 else: v[i] -= 1 # Finalen Hash berechnen simhash = 0 for i in range(SIMHASH_BITS): if v[i] > 0: simhash |= (1 << i) return format(simhash, "016x") def hamming_distance(hash1: str, hash2: str) -> int: """ Berechne Hamming-Distanz zwischen zwei SimHashes. Die Hamming-Distanz ist die Anzahl der unterschiedlichen Bits. Je kleiner, desto ähnlicher sind die Texte. Typische Schwellenwerte: - 0-3: Sehr ähnlich (wahrscheinlich Duplikat) - 4-7: Ähnlich (gleiches Thema) - 8+: Unterschiedlich Returns: Anzahl unterschiedlicher Bits (0-64) """ if not hash1 or not hash2: return SIMHASH_BITS try: h1 = int(hash1, 16) h2 = int(hash2, 16) except ValueError: return SIMHASH_BITS xor = h1 ^ h2 return bin(xor).count("1") def are_similar(hash1: str, hash2: str, threshold: int = 5) -> bool: """ Prüfe ob zwei Hashes auf ähnliche Texte hindeuten. Args: hash1: Erster SimHash hash2: Zweiter SimHash threshold: Maximale Hamming-Distanz für Ähnlichkeit Returns: True wenn Texte wahrscheinlich ähnlich sind """ return hamming_distance(hash1, hash2) <= threshold def find_duplicates(items: list, hash_field: str = "content_hash", threshold: int = 3) -> dict: """ Finde Duplikate/Cluster in einer Liste von Items. Args: items: Liste von Objekten mit hash_field Attribut hash_field: Name des Attributs das den SimHash enthält threshold: Max Hamming-Distanz für Duplikat-Erkennung Returns: Dict mit {item_id: cluster_id} für Duplikate """ clusters = {} # cluster_id -> list of items item_to_cluster = {} # item_id -> cluster_id cluster_counter = 0 for item in items: item_id = getattr(item, "id", str(id(item))) item_hash = getattr(item, hash_field, None) if not item_hash: continue # Suche nach existierendem Cluster found_cluster = None for cluster_id, cluster_items in clusters.items(): for existing_item in cluster_items: existing_hash = getattr(existing_item, hash_field, None) if existing_hash and hamming_distance(item_hash, existing_hash) <= threshold: found_cluster = cluster_id break if found_cluster: break if found_cluster: clusters[found_cluster].append(item) item_to_cluster[item_id] = found_cluster else: # Neuen Cluster starten cluster_id = f"cluster_{cluster_counter}" cluster_counter += 1 clusters[cluster_id] = [item] item_to_cluster[item_id] = cluster_id # Filtere Single-Item Cluster (keine echten Duplikate) duplicates = {} for item_id, cluster_id in item_to_cluster.items(): if len(clusters[cluster_id]) > 1: duplicates[item_id] = cluster_id return duplicates def exact_url_duplicates(items: list, url_field: str = "canonical_url") -> set: """ Finde exakte URL-Duplikate. Returns: Set von Item-IDs die Duplikate sind (nicht das Original) """ seen_urls = {} # url -> first item id duplicates = set() for item in items: item_id = getattr(item, "id", str(id(item))) url = getattr(item, url_field, None) if not url: continue if url in seen_urls: duplicates.add(item_id) else: seen_urls[url] = item_id return duplicates