backend-lehrer (11 files): - llm_gateway/routes/schools.py (867 → 5), recording_api.py (848 → 6) - messenger_api.py (840 → 5), print_generator.py (824 → 5) - unit_analytics_api.py (751 → 5), classroom/routes/context.py (726 → 4) - llm_gateway/routes/edu_search_seeds.py (710 → 4) klausur-service (12 files): - ocr_labeling_api.py (845 → 4), metrics_db.py (833 → 4) - legal_corpus_api.py (790 → 4), page_crop.py (758 → 3) - mail/ai_service.py (747 → 4), github_crawler.py (767 → 3) - trocr_service.py (730 → 4), full_compliance_pipeline.py (723 → 4) - dsfa_rag_api.py (715 → 4), ocr_pipeline_auto.py (705 → 4) website (6 pages): - audit-checklist (867 → 8), content (806 → 6) - screen-flow (790 → 4), scraper (789 → 5) - zeugnisse (776 → 5), modules (745 → 4) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
304 lines
11 KiB
Python
304 lines
11 KiB
Python
"""
|
|
GitHub Crawler - Document Parsers
|
|
|
|
Markdown, HTML, and JSON parsers for extracting structured content
|
|
from legal template documents.
|
|
|
|
Extracted from github_crawler.py to keep files under 500 LOC.
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
|
|
@dataclass
|
|
class ExtractedDocument:
|
|
"""A document extracted from a repository."""
|
|
text: str
|
|
title: str
|
|
file_path: str
|
|
file_type: str # "markdown", "html", "json", "text"
|
|
source_url: str
|
|
source_commit: Optional[str] = None
|
|
source_hash: str = "" # SHA256 of original content
|
|
sections: List[Dict[str, Any]] = field(default_factory=list)
|
|
placeholders: List[str] = field(default_factory=list)
|
|
language: str = "en"
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
def __post_init__(self):
|
|
if not self.source_hash and self.text:
|
|
self.source_hash = hashlib.sha256(self.text.encode()).hexdigest()
|
|
|
|
|
|
class MarkdownParser:
|
|
"""Parse Markdown files into structured content."""
|
|
|
|
# Common placeholder patterns
|
|
PLACEHOLDER_PATTERNS = [
|
|
r'\[([A-Z_]+)\]', # [COMPANY_NAME]
|
|
r'\{([a-z_]+)\}', # {company_name}
|
|
r'\{\{([a-z_]+)\}\}', # {{company_name}}
|
|
r'__([A-Z_]+)__', # __COMPANY_NAME__
|
|
r'<([A-Z_]+)>', # <COMPANY_NAME>
|
|
]
|
|
|
|
@classmethod
|
|
def parse(cls, content: str, filename: str = "") -> ExtractedDocument:
|
|
"""Parse markdown content into an ExtractedDocument."""
|
|
title = cls._extract_title(content, filename)
|
|
sections = cls._extract_sections(content)
|
|
placeholders = cls._find_placeholders(content)
|
|
language = cls._detect_language(content)
|
|
clean_text = cls._clean_for_indexing(content)
|
|
|
|
return ExtractedDocument(
|
|
text=clean_text,
|
|
title=title,
|
|
file_path=filename,
|
|
file_type="markdown",
|
|
source_url="",
|
|
sections=sections,
|
|
placeholders=placeholders,
|
|
language=language,
|
|
)
|
|
|
|
@classmethod
|
|
def _extract_title(cls, content: str, filename: str) -> str:
|
|
"""Extract title from markdown heading or filename."""
|
|
h1_match = re.search(r'^#\s+(.+)$', content, re.MULTILINE)
|
|
if h1_match:
|
|
return h1_match.group(1).strip()
|
|
|
|
frontmatter_match = re.search(
|
|
r'^---\s*\n.*?title:\s*["\']?(.+?)["\']?\s*\n.*?---',
|
|
content, re.DOTALL
|
|
)
|
|
if frontmatter_match:
|
|
return frontmatter_match.group(1).strip()
|
|
|
|
if filename:
|
|
name = Path(filename).stem
|
|
return name.replace('-', ' ').replace('_', ' ').title()
|
|
|
|
return "Untitled"
|
|
|
|
@classmethod
|
|
def _extract_sections(cls, content: str) -> List[Dict[str, Any]]:
|
|
"""Extract sections from markdown content."""
|
|
sections = []
|
|
current_section = {"heading": "", "level": 0, "content": "", "start": 0}
|
|
|
|
for match in re.finditer(r'^(#{1,6})\s+(.+)$', content, re.MULTILINE):
|
|
if current_section["heading"] or current_section["content"].strip():
|
|
current_section["content"] = current_section["content"].strip()
|
|
sections.append(current_section.copy())
|
|
|
|
level = len(match.group(1))
|
|
heading = match.group(2).strip()
|
|
current_section = {
|
|
"heading": heading,
|
|
"level": level,
|
|
"content": "",
|
|
"start": match.end(),
|
|
}
|
|
|
|
if current_section["heading"] or current_section["content"].strip():
|
|
current_section["content"] = content[current_section["start"]:].strip()
|
|
sections.append(current_section)
|
|
|
|
return sections
|
|
|
|
@classmethod
|
|
def _find_placeholders(cls, content: str) -> List[str]:
|
|
"""Find placeholder patterns in content."""
|
|
placeholders = set()
|
|
for pattern in cls.PLACEHOLDER_PATTERNS:
|
|
for match in re.finditer(pattern, content):
|
|
placeholder = match.group(0)
|
|
placeholders.add(placeholder)
|
|
return sorted(list(placeholders))
|
|
|
|
@classmethod
|
|
def _detect_language(cls, content: str) -> str:
|
|
"""Detect language from content."""
|
|
german_indicators = [
|
|
'Datenschutz', 'Impressum', 'Nutzungsbedingungen', 'Haftung',
|
|
'Widerruf', 'Verantwortlicher', 'personenbezogene', 'Verarbeitung',
|
|
'und', 'der', 'die', 'das', 'ist', 'wird', 'werden', 'sind',
|
|
]
|
|
|
|
lower_content = content.lower()
|
|
german_count = sum(1 for word in german_indicators if word.lower() in lower_content)
|
|
|
|
if german_count >= 3:
|
|
return "de"
|
|
return "en"
|
|
|
|
@classmethod
|
|
def _clean_for_indexing(cls, content: str) -> str:
|
|
"""Clean markdown content for text indexing."""
|
|
content = re.sub(r'^---\s*\n.*?---\s*\n', '', content, flags=re.DOTALL)
|
|
content = re.sub(r'<!--.*?-->', '', content, flags=re.DOTALL)
|
|
content = re.sub(r'<[^>]+>', '', content)
|
|
content = re.sub(r'\*\*(.+?)\*\*', r'\1', content)
|
|
content = re.sub(r'\*(.+?)\*', r'\1', content)
|
|
content = re.sub(r'`(.+?)`', r'\1', content)
|
|
content = re.sub(r'~~(.+?)~~', r'\1', content)
|
|
content = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', content)
|
|
content = re.sub(r'!\[([^\]]*)\]\([^)]+\)', r'\1', content)
|
|
content = re.sub(r'\n{3,}', '\n\n', content)
|
|
content = re.sub(r' +', ' ', content)
|
|
|
|
return content.strip()
|
|
|
|
|
|
class HTMLParser:
|
|
"""Parse HTML files into structured content."""
|
|
|
|
@classmethod
|
|
def parse(cls, content: str, filename: str = "") -> ExtractedDocument:
|
|
"""Parse HTML content into an ExtractedDocument."""
|
|
title_match = re.search(r'<title>(.+?)</title>', content, re.IGNORECASE)
|
|
title = title_match.group(1) if title_match else Path(filename).stem
|
|
|
|
text = cls._html_to_text(content)
|
|
placeholders = MarkdownParser._find_placeholders(text)
|
|
|
|
lang_match = re.search(r'<html[^>]*lang=["\']([a-z]{2})["\']', content, re.IGNORECASE)
|
|
language = lang_match.group(1) if lang_match else MarkdownParser._detect_language(text)
|
|
|
|
return ExtractedDocument(
|
|
text=text,
|
|
title=title,
|
|
file_path=filename,
|
|
file_type="html",
|
|
source_url="",
|
|
placeholders=placeholders,
|
|
language=language,
|
|
)
|
|
|
|
@classmethod
|
|
def _html_to_text(cls, html: str) -> str:
|
|
"""Convert HTML to clean text."""
|
|
html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL | re.IGNORECASE)
|
|
html = re.sub(r'<style[^>]*>.*?</style>', '', html, flags=re.DOTALL | re.IGNORECASE)
|
|
html = re.sub(r'<!--.*?-->', '', html, flags=re.DOTALL)
|
|
|
|
html = html.replace(' ', ' ')
|
|
html = html.replace('&', '&')
|
|
html = html.replace('<', '<')
|
|
html = html.replace('>', '>')
|
|
html = html.replace('"', '"')
|
|
html = html.replace(''', "'")
|
|
|
|
html = re.sub(r'<br\s*/?>', '\n', html, flags=re.IGNORECASE)
|
|
html = re.sub(r'</p>', '\n\n', html, flags=re.IGNORECASE)
|
|
html = re.sub(r'</div>', '\n', html, flags=re.IGNORECASE)
|
|
html = re.sub(r'</h[1-6]>', '\n\n', html, flags=re.IGNORECASE)
|
|
html = re.sub(r'</li>', '\n', html, flags=re.IGNORECASE)
|
|
|
|
html = re.sub(r'<[^>]+>', '', html)
|
|
|
|
html = re.sub(r'[ \t]+', ' ', html)
|
|
html = re.sub(r'\n[ \t]+', '\n', html)
|
|
html = re.sub(r'[ \t]+\n', '\n', html)
|
|
html = re.sub(r'\n{3,}', '\n\n', html)
|
|
|
|
return html.strip()
|
|
|
|
|
|
class JSONParser:
|
|
"""Parse JSON files containing legal template data."""
|
|
|
|
@classmethod
|
|
def parse(cls, content: str, filename: str = "") -> List[ExtractedDocument]:
|
|
"""Parse JSON content into ExtractedDocuments."""
|
|
try:
|
|
data = json.loads(content)
|
|
except json.JSONDecodeError as e:
|
|
import logging
|
|
logging.getLogger(__name__).warning(f"Failed to parse JSON from {filename}: {e}")
|
|
return []
|
|
|
|
documents = []
|
|
|
|
if isinstance(data, dict):
|
|
documents.extend(cls._parse_dict(data, filename))
|
|
elif isinstance(data, list):
|
|
for i, item in enumerate(data):
|
|
if isinstance(item, dict):
|
|
docs = cls._parse_dict(item, f"{filename}[{i}]")
|
|
documents.extend(docs)
|
|
|
|
return documents
|
|
|
|
@classmethod
|
|
def _parse_dict(cls, data: dict, filename: str) -> List[ExtractedDocument]:
|
|
"""Parse a dictionary into documents."""
|
|
documents = []
|
|
|
|
text_keys = ['text', 'content', 'body', 'description', 'value']
|
|
title_keys = ['title', 'name', 'heading', 'label', 'key']
|
|
|
|
text = ""
|
|
for key in text_keys:
|
|
if key in data and isinstance(data[key], str):
|
|
text = data[key]
|
|
break
|
|
|
|
if not text:
|
|
for key, value in data.items():
|
|
if isinstance(value, dict):
|
|
nested_docs = cls._parse_dict(value, f"{filename}.{key}")
|
|
documents.extend(nested_docs)
|
|
elif isinstance(value, list):
|
|
for i, item in enumerate(value):
|
|
if isinstance(item, dict):
|
|
nested_docs = cls._parse_dict(item, f"{filename}.{key}[{i}]")
|
|
documents.extend(nested_docs)
|
|
elif isinstance(item, str) and len(item) > 50:
|
|
documents.append(ExtractedDocument(
|
|
text=item,
|
|
title=f"{key} {i+1}",
|
|
file_path=filename,
|
|
file_type="json",
|
|
source_url="",
|
|
language=MarkdownParser._detect_language(item),
|
|
))
|
|
return documents
|
|
|
|
title = ""
|
|
for key in title_keys:
|
|
if key in data and isinstance(data[key], str):
|
|
title = data[key]
|
|
break
|
|
|
|
if not title:
|
|
title = Path(filename).stem
|
|
|
|
metadata = {}
|
|
for key, value in data.items():
|
|
if key not in text_keys + title_keys and not isinstance(value, (dict, list)):
|
|
metadata[key] = value
|
|
|
|
placeholders = MarkdownParser._find_placeholders(text)
|
|
language = data.get('lang', data.get('language', MarkdownParser._detect_language(text)))
|
|
|
|
documents.append(ExtractedDocument(
|
|
text=text,
|
|
title=title,
|
|
file_path=filename,
|
|
file_type="json",
|
|
source_url="",
|
|
placeholders=placeholders,
|
|
language=language,
|
|
metadata=metadata,
|
|
))
|
|
|
|
return documents
|