A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.
This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).
Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
240 lines
6.6 KiB
Python
240 lines
6.6 KiB
Python
"""
|
|
Deduplizierung für Alerts.
|
|
|
|
Nutzt SimHash für Fuzzy-Matching von ähnlichen Texten.
|
|
SimHash ist ein Locality-Sensitive Hash, bei dem ähnliche Texte
|
|
ähnliche Hashes produzieren.
|
|
"""
|
|
|
|
import hashlib
|
|
import re
|
|
from typing import Optional
|
|
from collections import Counter
|
|
|
|
|
|
# SimHash Parameter
|
|
SIMHASH_BITS = 64
|
|
SHINGLE_SIZE = 3 # Anzahl aufeinanderfolgender Wörter
|
|
|
|
|
|
def _tokenize(text: str) -> list:
|
|
"""
|
|
Tokenisiere Text in normalisierte Wörter.
|
|
|
|
- Lowercase
|
|
- Nur alphanumerische Zeichen
|
|
- Stoppwörter entfernen (deutsche)
|
|
"""
|
|
# Deutsche Stoppwörter (häufige Wörter ohne semantischen Wert)
|
|
STOPWORDS = {
|
|
"der", "die", "das", "den", "dem", "des", "ein", "eine", "einer", "eines",
|
|
"und", "oder", "aber", "doch", "wenn", "weil", "dass", "als", "auch",
|
|
"ist", "sind", "war", "waren", "wird", "werden", "wurde", "wurden",
|
|
"hat", "haben", "hatte", "hatten", "kann", "können", "konnte", "konnten",
|
|
"für", "von", "mit", "bei", "nach", "aus", "über", "unter", "vor", "hinter",
|
|
"auf", "an", "in", "im", "am", "um", "bis", "durch", "ohne", "gegen",
|
|
"nicht", "noch", "nur", "schon", "sehr", "mehr", "sich", "es", "sie", "er",
|
|
"wir", "ihr", "ich", "du", "man", "so", "wie", "was", "wer", "wo", "wann",
|
|
}
|
|
|
|
# Normalisiere
|
|
text = text.lower()
|
|
# Nur Buchstaben, Zahlen und Umlaute
|
|
text = re.sub(r"[^a-zäöüß0-9\s]", " ", text)
|
|
# Tokenisiere
|
|
words = text.split()
|
|
# Filtere Stoppwörter und kurze Wörter
|
|
words = [w for w in words if w not in STOPWORDS and len(w) > 2]
|
|
|
|
return words
|
|
|
|
|
|
def _create_shingles(words: list, size: int = SHINGLE_SIZE) -> list:
|
|
"""
|
|
Erstelle Shingles (n-Gramme) aus Wortliste.
|
|
|
|
Shingles sind überlappende Sequenzen von Wörtern.
|
|
z.B. ["a", "b", "c", "d"] mit size=2 -> ["a b", "b c", "c d"]
|
|
"""
|
|
if len(words) < size:
|
|
return [" ".join(words)] if words else []
|
|
|
|
return [" ".join(words[i:i+size]) for i in range(len(words) - size + 1)]
|
|
|
|
|
|
def _hash_shingle(shingle: str) -> int:
|
|
"""Hash ein Shingle zu einer 64-bit Zahl."""
|
|
# Nutze MD5 und nimm erste 8 Bytes (64 bit)
|
|
h = hashlib.md5(shingle.encode()).digest()[:8]
|
|
return int.from_bytes(h, byteorder="big")
|
|
|
|
|
|
def compute_simhash(text: str) -> str:
|
|
"""
|
|
Berechne SimHash eines Texts.
|
|
|
|
SimHash funktioniert wie folgt:
|
|
1. Text in Shingles (Wort-n-Gramme) aufteilen
|
|
2. Jeden Shingle hashen
|
|
3. Für jeden Hash: Wenn Bit=1 -> +1, sonst -1
|
|
4. Summieren über alle Hashes
|
|
5. Wenn Summe > 0 -> Bit=1, sonst 0
|
|
|
|
Returns:
|
|
16-Zeichen Hex-String (64 bit)
|
|
"""
|
|
if not text:
|
|
return "0" * 16
|
|
|
|
words = _tokenize(text)
|
|
if not words:
|
|
return "0" * 16
|
|
|
|
shingles = _create_shingles(words)
|
|
if not shingles:
|
|
return "0" * 16
|
|
|
|
# Bit-Vektoren initialisieren
|
|
v = [0] * SIMHASH_BITS
|
|
|
|
for shingle in shingles:
|
|
h = _hash_shingle(shingle)
|
|
|
|
for i in range(SIMHASH_BITS):
|
|
bit = (h >> i) & 1
|
|
if bit:
|
|
v[i] += 1
|
|
else:
|
|
v[i] -= 1
|
|
|
|
# Finalen Hash berechnen
|
|
simhash = 0
|
|
for i in range(SIMHASH_BITS):
|
|
if v[i] > 0:
|
|
simhash |= (1 << i)
|
|
|
|
return format(simhash, "016x")
|
|
|
|
|
|
def hamming_distance(hash1: str, hash2: str) -> int:
|
|
"""
|
|
Berechne Hamming-Distanz zwischen zwei SimHashes.
|
|
|
|
Die Hamming-Distanz ist die Anzahl der unterschiedlichen Bits.
|
|
Je kleiner, desto ähnlicher sind die Texte.
|
|
|
|
Typische Schwellenwerte:
|
|
- 0-3: Sehr ähnlich (wahrscheinlich Duplikat)
|
|
- 4-7: Ähnlich (gleiches Thema)
|
|
- 8+: Unterschiedlich
|
|
|
|
Returns:
|
|
Anzahl unterschiedlicher Bits (0-64)
|
|
"""
|
|
if not hash1 or not hash2:
|
|
return SIMHASH_BITS
|
|
|
|
try:
|
|
h1 = int(hash1, 16)
|
|
h2 = int(hash2, 16)
|
|
except ValueError:
|
|
return SIMHASH_BITS
|
|
|
|
xor = h1 ^ h2
|
|
return bin(xor).count("1")
|
|
|
|
|
|
def are_similar(hash1: str, hash2: str, threshold: int = 5) -> bool:
|
|
"""
|
|
Prüfe ob zwei Hashes auf ähnliche Texte hindeuten.
|
|
|
|
Args:
|
|
hash1: Erster SimHash
|
|
hash2: Zweiter SimHash
|
|
threshold: Maximale Hamming-Distanz für Ähnlichkeit
|
|
|
|
Returns:
|
|
True wenn Texte wahrscheinlich ähnlich sind
|
|
"""
|
|
return hamming_distance(hash1, hash2) <= threshold
|
|
|
|
|
|
def find_duplicates(items: list, hash_field: str = "content_hash",
|
|
threshold: int = 3) -> dict:
|
|
"""
|
|
Finde Duplikate/Cluster in einer Liste von Items.
|
|
|
|
Args:
|
|
items: Liste von Objekten mit hash_field Attribut
|
|
hash_field: Name des Attributs das den SimHash enthält
|
|
threshold: Max Hamming-Distanz für Duplikat-Erkennung
|
|
|
|
Returns:
|
|
Dict mit {item_id: cluster_id} für Duplikate
|
|
"""
|
|
clusters = {} # cluster_id -> list of items
|
|
item_to_cluster = {} # item_id -> cluster_id
|
|
|
|
cluster_counter = 0
|
|
|
|
for item in items:
|
|
item_id = getattr(item, "id", str(id(item)))
|
|
item_hash = getattr(item, hash_field, None)
|
|
|
|
if not item_hash:
|
|
continue
|
|
|
|
# Suche nach existierendem Cluster
|
|
found_cluster = None
|
|
for cluster_id, cluster_items in clusters.items():
|
|
for existing_item in cluster_items:
|
|
existing_hash = getattr(existing_item, hash_field, None)
|
|
if existing_hash and hamming_distance(item_hash, existing_hash) <= threshold:
|
|
found_cluster = cluster_id
|
|
break
|
|
if found_cluster:
|
|
break
|
|
|
|
if found_cluster:
|
|
clusters[found_cluster].append(item)
|
|
item_to_cluster[item_id] = found_cluster
|
|
else:
|
|
# Neuen Cluster starten
|
|
cluster_id = f"cluster_{cluster_counter}"
|
|
cluster_counter += 1
|
|
clusters[cluster_id] = [item]
|
|
item_to_cluster[item_id] = cluster_id
|
|
|
|
# Filtere Single-Item Cluster (keine echten Duplikate)
|
|
duplicates = {}
|
|
for item_id, cluster_id in item_to_cluster.items():
|
|
if len(clusters[cluster_id]) > 1:
|
|
duplicates[item_id] = cluster_id
|
|
|
|
return duplicates
|
|
|
|
|
|
def exact_url_duplicates(items: list, url_field: str = "canonical_url") -> set:
|
|
"""
|
|
Finde exakte URL-Duplikate.
|
|
|
|
Returns:
|
|
Set von Item-IDs die Duplikate sind (nicht das Original)
|
|
"""
|
|
seen_urls = {} # url -> first item id
|
|
duplicates = set()
|
|
|
|
for item in items:
|
|
item_id = getattr(item, "id", str(id(item)))
|
|
url = getattr(item, url_field, None)
|
|
|
|
if not url:
|
|
continue
|
|
|
|
if url in seen_urls:
|
|
duplicates.add(item_id)
|
|
else:
|
|
seen_urls[url] = item_id
|
|
|
|
return duplicates
|