Fix: Remove broken getKlausurApiUrl and clean up empty lines
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 42s
CI / test-go-edu-search (push) Successful in 34s
CI / test-python-klausur (push) Failing after 2m51s
CI / test-python-agent-core (push) Successful in 21s
CI / test-nodejs-website (push) Successful in 29s
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 42s
CI / test-go-edu-search (push) Successful in 34s
CI / test-python-klausur (push) Failing after 2m51s
CI / test-python-agent-core (push) Successful in 21s
CI / test-nodejs-website (push) Successful in 29s
sed replacement left orphaned hostname references in story page and empty lines in getApiBase functions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
140
klausur-service/backend/dsfa_chunking.py
Normal file
140
klausur-service/backend/dsfa_chunking.py
Normal file
@@ -0,0 +1,140 @@
|
||||
"""
|
||||
DSFA Chunking — Text chunking strategies for document ingestion.
|
||||
|
||||
Contains:
|
||||
- chunk_text_recursive: Recursive chunking with overlap
|
||||
- chunk_by_sections: Section-marker-based chunking
|
||||
- chunk_by_list_items: List-item-based chunking
|
||||
- chunk_document: Strategy router
|
||||
"""
|
||||
|
||||
import re
|
||||
from typing import List, Dict
|
||||
|
||||
from dsfa_sources_registry import DSFA_CHUNK_CONFIG
|
||||
|
||||
|
||||
def chunk_text_recursive(text: str, max_size: int = 1000, overlap: int = 200) -> List[Dict]:
|
||||
"""Recursively chunk text with overlap."""
|
||||
chunks = []
|
||||
start = 0
|
||||
|
||||
while start < len(text):
|
||||
end = min(start + max_size, len(text))
|
||||
|
||||
# Find a good break point (sentence end, paragraph)
|
||||
if end < len(text):
|
||||
for sep in ["\n\n", "\n", ". ", ", ", " "]:
|
||||
last_sep = text[start:end].rfind(sep)
|
||||
if last_sep > max_size // 2:
|
||||
end = start + last_sep + len(sep)
|
||||
break
|
||||
|
||||
chunk_text = text[start:end].strip()
|
||||
if chunk_text:
|
||||
chunks.append({
|
||||
"content": chunk_text,
|
||||
"start_char": start,
|
||||
"end_char": end
|
||||
})
|
||||
|
||||
start = end - overlap if end < len(text) else len(text)
|
||||
|
||||
return chunks
|
||||
|
||||
|
||||
def chunk_by_sections(text: str, markers: List[str], max_size: int = 1500, overlap: int = 200) -> List[Dict]:
|
||||
"""Chunk text by section markers."""
|
||||
chunks = []
|
||||
pattern = "|".join(f"({m})" for m in markers)
|
||||
|
||||
matches = list(re.finditer(pattern, text, re.IGNORECASE | re.MULTILINE))
|
||||
|
||||
if not matches:
|
||||
return chunk_text_recursive(text, max_size, overlap)
|
||||
|
||||
for i, match in enumerate(matches):
|
||||
start = match.start()
|
||||
end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
|
||||
|
||||
section_text = text[start:end].strip()
|
||||
section_title = match.group(0).strip()
|
||||
|
||||
if len(section_text) > max_size:
|
||||
sub_chunks = chunk_text_recursive(section_text, max_size, overlap)
|
||||
for j, sub in enumerate(sub_chunks):
|
||||
chunks.append({
|
||||
"content": sub["content"],
|
||||
"section_title": section_title if j == 0 else f"{section_title} (cont.)",
|
||||
"start_char": start + sub["start_char"],
|
||||
"end_char": start + sub["end_char"]
|
||||
})
|
||||
else:
|
||||
chunks.append({
|
||||
"content": section_text,
|
||||
"section_title": section_title,
|
||||
"start_char": start,
|
||||
"end_char": end
|
||||
})
|
||||
|
||||
return chunks
|
||||
|
||||
|
||||
def chunk_by_list_items(text: str, markers: List[str], max_size: int = 800) -> List[Dict]:
|
||||
"""Chunk text by list item markers."""
|
||||
chunks = []
|
||||
pattern = "|".join(f"({m})" for m in markers)
|
||||
|
||||
lines = text.split("\n")
|
||||
current_item = ""
|
||||
current_start = 0
|
||||
|
||||
for i, line in enumerate(lines):
|
||||
if re.match(pattern, line.strip()):
|
||||
if current_item.strip():
|
||||
chunks.append({
|
||||
"content": current_item.strip(),
|
||||
"start_char": current_start,
|
||||
"end_char": current_start + len(current_item)
|
||||
})
|
||||
current_item = line
|
||||
current_start = sum(len(lines[j]) + 1 for j in range(i))
|
||||
else:
|
||||
current_item += "\n" + line
|
||||
|
||||
if current_item.strip():
|
||||
chunks.append({
|
||||
"content": current_item.strip(),
|
||||
"start_char": current_start,
|
||||
"end_char": current_start + len(current_item)
|
||||
})
|
||||
|
||||
return chunks
|
||||
|
||||
|
||||
def chunk_document(text: str, source_code: str) -> List[Dict]:
|
||||
"""Chunk document using appropriate strategy for source type."""
|
||||
config = DSFA_CHUNK_CONFIG.get(source_code, DSFA_CHUNK_CONFIG["DEFAULT"])
|
||||
|
||||
if source_code.endswith("_MUSS_PUBLIC") or source_code.endswith("_MUSS_PRIVATE"):
|
||||
config = DSFA_CHUNK_CONFIG["MUSS_LISTEN"]
|
||||
|
||||
if config["strategy"] == "section_based":
|
||||
return chunk_by_sections(
|
||||
text,
|
||||
config["section_markers"],
|
||||
config["max_chunk_size"],
|
||||
config["overlap"]
|
||||
)
|
||||
elif config["strategy"] == "list_item":
|
||||
return chunk_by_list_items(
|
||||
text,
|
||||
config["list_markers"],
|
||||
config["max_chunk_size"]
|
||||
)
|
||||
else:
|
||||
return chunk_text_recursive(
|
||||
text,
|
||||
config["max_chunk_size"],
|
||||
config["overlap"]
|
||||
)
|
||||
Reference in New Issue
Block a user