This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
Benjamin Admin 21a844cb8a fix: Restore all files lost during destructive rebase
A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.

This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).

Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 09:51:32 +01:00

240 lines
6.6 KiB
Python

"""
Deduplizierung für Alerts.
Nutzt SimHash für Fuzzy-Matching von ähnlichen Texten.
SimHash ist ein Locality-Sensitive Hash, bei dem ähnliche Texte
ähnliche Hashes produzieren.
"""
import hashlib
import re
from typing import Optional
from collections import Counter
# SimHash Parameter
SIMHASH_BITS = 64
SHINGLE_SIZE = 3 # Anzahl aufeinanderfolgender Wörter
def _tokenize(text: str) -> list:
"""
Tokenisiere Text in normalisierte Wörter.
- Lowercase
- Nur alphanumerische Zeichen
- Stoppwörter entfernen (deutsche)
"""
# Deutsche Stoppwörter (häufige Wörter ohne semantischen Wert)
STOPWORDS = {
"der", "die", "das", "den", "dem", "des", "ein", "eine", "einer", "eines",
"und", "oder", "aber", "doch", "wenn", "weil", "dass", "als", "auch",
"ist", "sind", "war", "waren", "wird", "werden", "wurde", "wurden",
"hat", "haben", "hatte", "hatten", "kann", "können", "konnte", "konnten",
"für", "von", "mit", "bei", "nach", "aus", "über", "unter", "vor", "hinter",
"auf", "an", "in", "im", "am", "um", "bis", "durch", "ohne", "gegen",
"nicht", "noch", "nur", "schon", "sehr", "mehr", "sich", "es", "sie", "er",
"wir", "ihr", "ich", "du", "man", "so", "wie", "was", "wer", "wo", "wann",
}
# Normalisiere
text = text.lower()
# Nur Buchstaben, Zahlen und Umlaute
text = re.sub(r"[^a-zäöüß0-9\s]", " ", text)
# Tokenisiere
words = text.split()
# Filtere Stoppwörter und kurze Wörter
words = [w for w in words if w not in STOPWORDS and len(w) > 2]
return words
def _create_shingles(words: list, size: int = SHINGLE_SIZE) -> list:
"""
Erstelle Shingles (n-Gramme) aus Wortliste.
Shingles sind überlappende Sequenzen von Wörtern.
z.B. ["a", "b", "c", "d"] mit size=2 -> ["a b", "b c", "c d"]
"""
if len(words) < size:
return [" ".join(words)] if words else []
return [" ".join(words[i:i+size]) for i in range(len(words) - size + 1)]
def _hash_shingle(shingle: str) -> int:
"""Hash ein Shingle zu einer 64-bit Zahl."""
# Nutze MD5 und nimm erste 8 Bytes (64 bit)
h = hashlib.md5(shingle.encode()).digest()[:8]
return int.from_bytes(h, byteorder="big")
def compute_simhash(text: str) -> str:
"""
Berechne SimHash eines Texts.
SimHash funktioniert wie folgt:
1. Text in Shingles (Wort-n-Gramme) aufteilen
2. Jeden Shingle hashen
3. Für jeden Hash: Wenn Bit=1 -> +1, sonst -1
4. Summieren über alle Hashes
5. Wenn Summe > 0 -> Bit=1, sonst 0
Returns:
16-Zeichen Hex-String (64 bit)
"""
if not text:
return "0" * 16
words = _tokenize(text)
if not words:
return "0" * 16
shingles = _create_shingles(words)
if not shingles:
return "0" * 16
# Bit-Vektoren initialisieren
v = [0] * SIMHASH_BITS
for shingle in shingles:
h = _hash_shingle(shingle)
for i in range(SIMHASH_BITS):
bit = (h >> i) & 1
if bit:
v[i] += 1
else:
v[i] -= 1
# Finalen Hash berechnen
simhash = 0
for i in range(SIMHASH_BITS):
if v[i] > 0:
simhash |= (1 << i)
return format(simhash, "016x")
def hamming_distance(hash1: str, hash2: str) -> int:
"""
Berechne Hamming-Distanz zwischen zwei SimHashes.
Die Hamming-Distanz ist die Anzahl der unterschiedlichen Bits.
Je kleiner, desto ähnlicher sind die Texte.
Typische Schwellenwerte:
- 0-3: Sehr ähnlich (wahrscheinlich Duplikat)
- 4-7: Ähnlich (gleiches Thema)
- 8+: Unterschiedlich
Returns:
Anzahl unterschiedlicher Bits (0-64)
"""
if not hash1 or not hash2:
return SIMHASH_BITS
try:
h1 = int(hash1, 16)
h2 = int(hash2, 16)
except ValueError:
return SIMHASH_BITS
xor = h1 ^ h2
return bin(xor).count("1")
def are_similar(hash1: str, hash2: str, threshold: int = 5) -> bool:
"""
Prüfe ob zwei Hashes auf ähnliche Texte hindeuten.
Args:
hash1: Erster SimHash
hash2: Zweiter SimHash
threshold: Maximale Hamming-Distanz für Ähnlichkeit
Returns:
True wenn Texte wahrscheinlich ähnlich sind
"""
return hamming_distance(hash1, hash2) <= threshold
def find_duplicates(items: list, hash_field: str = "content_hash",
threshold: int = 3) -> dict:
"""
Finde Duplikate/Cluster in einer Liste von Items.
Args:
items: Liste von Objekten mit hash_field Attribut
hash_field: Name des Attributs das den SimHash enthält
threshold: Max Hamming-Distanz für Duplikat-Erkennung
Returns:
Dict mit {item_id: cluster_id} für Duplikate
"""
clusters = {} # cluster_id -> list of items
item_to_cluster = {} # item_id -> cluster_id
cluster_counter = 0
for item in items:
item_id = getattr(item, "id", str(id(item)))
item_hash = getattr(item, hash_field, None)
if not item_hash:
continue
# Suche nach existierendem Cluster
found_cluster = None
for cluster_id, cluster_items in clusters.items():
for existing_item in cluster_items:
existing_hash = getattr(existing_item, hash_field, None)
if existing_hash and hamming_distance(item_hash, existing_hash) <= threshold:
found_cluster = cluster_id
break
if found_cluster:
break
if found_cluster:
clusters[found_cluster].append(item)
item_to_cluster[item_id] = found_cluster
else:
# Neuen Cluster starten
cluster_id = f"cluster_{cluster_counter}"
cluster_counter += 1
clusters[cluster_id] = [item]
item_to_cluster[item_id] = cluster_id
# Filtere Single-Item Cluster (keine echten Duplikate)
duplicates = {}
for item_id, cluster_id in item_to_cluster.items():
if len(clusters[cluster_id]) > 1:
duplicates[item_id] = cluster_id
return duplicates
def exact_url_duplicates(items: list, url_field: str = "canonical_url") -> set:
"""
Finde exakte URL-Duplikate.
Returns:
Set von Item-IDs die Duplikate sind (nicht das Original)
"""
seen_urls = {} # url -> first item id
duplicates = set()
for item in items:
item_id = getattr(item, "id", str(id(item)))
url = getattr(item, url_field, None)
if not url:
continue
if url in seen_urls:
duplicates.add(item_id)
else:
seen_urls[url] = item_id
return duplicates