""" Too-Close Similarity Detector — checks whether a candidate text is too similar to a protected source text (copyright / license compliance). Five metrics: 1. Exact-phrase — longest identical token sequence 2. Token overlap — Jaccard similarity of token sets 3. 3-gram Jaccard — Jaccard similarity of character 3-grams 4. Embedding cosine — via bge-m3 (Ollama or embedding-service) 5. LCS ratio — Longest Common Subsequence / max(len_a, len_b) Decision: PASS — no fail + max 1 warn WARN — max 2 warn, no fail → human review FAIL — any fail threshold → block, rewrite required """ from __future__ import annotations import logging import re from dataclasses import dataclass from typing import Optional import httpx logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Thresholds # --------------------------------------------------------------------------- THRESHOLDS = { "max_exact_run": {"warn": 8, "fail": 12}, "token_overlap": {"warn": 0.20, "fail": 0.30}, "ngram_jaccard": {"warn": 0.10, "fail": 0.18}, "embedding_cosine": {"warn": 0.86, "fail": 0.92}, "lcs_ratio": {"warn": 0.35, "fail": 0.50}, } # --------------------------------------------------------------------------- # Tokenisation helpers # --------------------------------------------------------------------------- _WORD_RE = re.compile(r"\w+", re.UNICODE) def _tokenize(text: str) -> list[str]: return [t.lower() for t in _WORD_RE.findall(text)] def _char_ngrams(text: str, n: int = 3) -> set[str]: text = text.lower() return {text[i : i + n] for i in range(len(text) - n + 1)} if len(text) >= n else set() # --------------------------------------------------------------------------- # Metric implementations # --------------------------------------------------------------------------- def max_exact_run(tokens_a: list[str], tokens_b: list[str]) -> int: """Longest contiguous identical token sequence between a and b.""" if not tokens_a or not tokens_b: return 0 best = 0 set_b = set(tokens_b) for i in range(len(tokens_a)): if tokens_a[i] not in set_b: continue for j in range(len(tokens_b)): if tokens_a[i] != tokens_b[j]: continue run = 0 ii, jj = i, j while ii < len(tokens_a) and jj < len(tokens_b) and tokens_a[ii] == tokens_b[jj]: run += 1 ii += 1 jj += 1 if run > best: best = run return best def token_overlap_jaccard(tokens_a: list[str], tokens_b: list[str]) -> float: """Jaccard similarity of token sets.""" set_a, set_b = set(tokens_a), set(tokens_b) if not set_a and not set_b: return 0.0 return len(set_a & set_b) / len(set_a | set_b) def ngram_jaccard(text_a: str, text_b: str, n: int = 3) -> float: """Jaccard similarity of character n-grams.""" grams_a = _char_ngrams(text_a, n) grams_b = _char_ngrams(text_b, n) if not grams_a and not grams_b: return 0.0 return len(grams_a & grams_b) / len(grams_a | grams_b) def lcs_ratio(tokens_a: list[str], tokens_b: list[str]) -> float: """LCS length / max(len_a, len_b).""" m, n = len(tokens_a), len(tokens_b) if m == 0 or n == 0: return 0.0 # Space-optimised LCS (two rows) prev = [0] * (n + 1) curr = [0] * (n + 1) for i in range(1, m + 1): for j in range(1, n + 1): if tokens_a[i - 1] == tokens_b[j - 1]: curr[j] = prev[j - 1] + 1 else: curr[j] = max(prev[j], curr[j - 1]) prev, curr = curr, [0] * (n + 1) return prev[n] / max(m, n) async def embedding_cosine(text_a: str, text_b: str, embedding_url: str | None = None) -> float: """Cosine similarity via embedding service (bge-m3). Falls back to 0.0 if the service is unreachable. """ url = embedding_url or "http://embedding-service:8087" try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.post( f"{url}/embed", json={"texts": [text_a, text_b]}, ) resp.raise_for_status() embeddings = resp.json().get("embeddings", []) if len(embeddings) < 2: return 0.0 return _cosine(embeddings[0], embeddings[1]) except Exception: logger.warning("Embedding service unreachable, skipping cosine check") return 0.0 def _cosine(a: list[float], b: list[float]) -> float: dot = sum(x * y for x, y in zip(a, b)) norm_a = sum(x * x for x in a) ** 0.5 norm_b = sum(x * x for x in b) ** 0.5 if norm_a == 0 or norm_b == 0: return 0.0 return dot / (norm_a * norm_b) # --------------------------------------------------------------------------- # Decision engine # --------------------------------------------------------------------------- @dataclass class SimilarityReport: max_exact_run: int token_overlap: float ngram_jaccard: float embedding_cosine: float lcs_ratio: float status: str # PASS, WARN, FAIL details: dict # per-metric status def _classify(value: float | int, metric: str) -> str: t = THRESHOLDS[metric] if value >= t["fail"]: return "FAIL" if value >= t["warn"]: return "WARN" return "PASS" async def check_similarity( source_text: str, candidate_text: str, embedding_url: str | None = None, ) -> SimilarityReport: """Run all 5 metrics and return an aggregate report.""" tok_src = _tokenize(source_text) tok_cand = _tokenize(candidate_text) m_exact = max_exact_run(tok_src, tok_cand) m_token = token_overlap_jaccard(tok_src, tok_cand) m_ngram = ngram_jaccard(source_text, candidate_text) m_embed = await embedding_cosine(source_text, candidate_text, embedding_url) m_lcs = lcs_ratio(tok_src, tok_cand) details = { "max_exact_run": _classify(m_exact, "max_exact_run"), "token_overlap": _classify(m_token, "token_overlap"), "ngram_jaccard": _classify(m_ngram, "ngram_jaccard"), "embedding_cosine": _classify(m_embed, "embedding_cosine"), "lcs_ratio": _classify(m_lcs, "lcs_ratio"), } fail_count = sum(1 for v in details.values() if v == "FAIL") warn_count = sum(1 for v in details.values() if v == "WARN") if fail_count > 0: status = "FAIL" elif warn_count > 2: status = "FAIL" elif warn_count > 1: status = "WARN" elif warn_count == 1: status = "PASS" else: status = "PASS" return SimilarityReport( max_exact_run=m_exact, token_overlap=round(m_token, 4), ngram_jaccard=round(m_ngram, 4), embedding_cosine=round(m_embed, 4), lcs_ratio=round(m_lcs, 4), status=status, details=details, )