This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/klausur-service/backend/github_crawler.py
BreakPilot Dev 19855efacc
Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
feat: BreakPilot PWA - Full codebase (clean push without large binaries)
All services: admin-v2, studio-v2, website, ai-compliance-sdk,
consent-service, klausur-service, voice-service, and infrastructure.
Large PDFs and compiled binaries excluded via .gitignore.
2026-02-11 13:25:58 +01:00

768 lines
27 KiB
Python

"""
GitHub Repository Crawler for Legal Templates.
Crawls GitHub and GitLab repositories to extract legal template documents
(Markdown, HTML, JSON, etc.) for ingestion into the RAG system.
Features:
- Clone repositories via Git or download as ZIP
- Parse Markdown, HTML, JSON, and plain text files
- Extract structured content with metadata
- Track git commit hashes for reproducibility
- Handle rate limiting and errors gracefully
"""
import asyncio
import hashlib
import json
import logging
import os
import re
import shutil
import tempfile
import zipfile
from dataclasses import dataclass, field
from datetime import datetime
from fnmatch import fnmatch
from pathlib import Path
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple
from urllib.parse import urlparse
import httpx
from template_sources import LicenseType, SourceConfig, LICENSES
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configuration
GITHUB_API_URL = "https://api.github.com"
GITLAB_API_URL = "https://gitlab.com/api/v4"
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN", "") # Optional for higher rate limits
MAX_FILE_SIZE = 1024 * 1024 # 1 MB max file size
REQUEST_TIMEOUT = 60.0
RATE_LIMIT_DELAY = 1.0 # Delay between requests to avoid rate limiting
@dataclass
class ExtractedDocument:
"""A document extracted from a repository."""
text: str
title: str
file_path: str
file_type: str # "markdown", "html", "json", "text"
source_url: str
source_commit: Optional[str] = None
source_hash: str = "" # SHA256 of original content
sections: List[Dict[str, Any]] = field(default_factory=list)
placeholders: List[str] = field(default_factory=list)
language: str = "en"
metadata: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self):
if not self.source_hash and self.text:
self.source_hash = hashlib.sha256(self.text.encode()).hexdigest()
class MarkdownParser:
"""Parse Markdown files into structured content."""
# Common placeholder patterns
PLACEHOLDER_PATTERNS = [
r'\[([A-Z_]+)\]', # [COMPANY_NAME]
r'\{([a-z_]+)\}', # {company_name}
r'\{\{([a-z_]+)\}\}', # {{company_name}}
r'__([A-Z_]+)__', # __COMPANY_NAME__
r'<([A-Z_]+)>', # <COMPANY_NAME>
]
@classmethod
def parse(cls, content: str, filename: str = "") -> ExtractedDocument:
"""Parse markdown content into an ExtractedDocument."""
# Extract title from first heading or filename
title = cls._extract_title(content, filename)
# Extract sections
sections = cls._extract_sections(content)
# Find placeholders
placeholders = cls._find_placeholders(content)
# Detect language
language = cls._detect_language(content)
# Clean content for indexing
clean_text = cls._clean_for_indexing(content)
return ExtractedDocument(
text=clean_text,
title=title,
file_path=filename,
file_type="markdown",
source_url="", # Will be set by caller
sections=sections,
placeholders=placeholders,
language=language,
)
@classmethod
def _extract_title(cls, content: str, filename: str) -> str:
"""Extract title from markdown heading or filename."""
# Look for first h1 heading
h1_match = re.search(r'^#\s+(.+)$', content, re.MULTILINE)
if h1_match:
return h1_match.group(1).strip()
# Look for YAML frontmatter title
frontmatter_match = re.search(
r'^---\s*\n.*?title:\s*["\']?(.+?)["\']?\s*\n.*?---',
content, re.DOTALL
)
if frontmatter_match:
return frontmatter_match.group(1).strip()
# Fall back to filename
if filename:
name = Path(filename).stem
# Convert kebab-case or snake_case to title case
return name.replace('-', ' ').replace('_', ' ').title()
return "Untitled"
@classmethod
def _extract_sections(cls, content: str) -> List[Dict[str, Any]]:
"""Extract sections from markdown content."""
sections = []
current_section = {"heading": "", "level": 0, "content": "", "start": 0}
for match in re.finditer(r'^(#{1,6})\s+(.+)$', content, re.MULTILINE):
# Save previous section if it has content
if current_section["heading"] or current_section["content"].strip():
current_section["content"] = current_section["content"].strip()
sections.append(current_section.copy())
# Start new section
level = len(match.group(1))
heading = match.group(2).strip()
current_section = {
"heading": heading,
"level": level,
"content": "",
"start": match.end(),
}
# Add final section
if current_section["heading"] or current_section["content"].strip():
current_section["content"] = content[current_section["start"]:].strip()
sections.append(current_section)
return sections
@classmethod
def _find_placeholders(cls, content: str) -> List[str]:
"""Find placeholder patterns in content."""
placeholders = set()
for pattern in cls.PLACEHOLDER_PATTERNS:
for match in re.finditer(pattern, content):
placeholder = match.group(0)
placeholders.add(placeholder)
return sorted(list(placeholders))
@classmethod
def _detect_language(cls, content: str) -> str:
"""Detect language from content."""
# Look for German-specific words
german_indicators = [
'Datenschutz', 'Impressum', 'Nutzungsbedingungen', 'Haftung',
'Widerruf', 'Verantwortlicher', 'personenbezogene', 'Verarbeitung',
'und', 'der', 'die', 'das', 'ist', 'wird', 'werden', 'sind',
]
lower_content = content.lower()
german_count = sum(1 for word in german_indicators if word.lower() in lower_content)
if german_count >= 3:
return "de"
return "en"
@classmethod
def _clean_for_indexing(cls, content: str) -> str:
"""Clean markdown content for text indexing."""
# Remove YAML frontmatter
content = re.sub(r'^---\s*\n.*?---\s*\n', '', content, flags=re.DOTALL)
# Remove HTML comments
content = re.sub(r'<!--.*?-->', '', content, flags=re.DOTALL)
# Remove inline HTML tags but keep content
content = re.sub(r'<[^>]+>', '', content)
# Convert markdown formatting
content = re.sub(r'\*\*(.+?)\*\*', r'\1', content) # Bold
content = re.sub(r'\*(.+?)\*', r'\1', content) # Italic
content = re.sub(r'`(.+?)`', r'\1', content) # Inline code
content = re.sub(r'~~(.+?)~~', r'\1', content) # Strikethrough
# Remove link syntax but keep text
content = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', content)
# Remove image syntax
content = re.sub(r'!\[([^\]]*)\]\([^)]+\)', r'\1', content)
# Clean up whitespace
content = re.sub(r'\n{3,}', '\n\n', content)
content = re.sub(r' +', ' ', content)
return content.strip()
class HTMLParser:
"""Parse HTML files into structured content."""
@classmethod
def parse(cls, content: str, filename: str = "") -> ExtractedDocument:
"""Parse HTML content into an ExtractedDocument."""
# Extract title
title_match = re.search(r'<title>(.+?)</title>', content, re.IGNORECASE)
title = title_match.group(1) if title_match else Path(filename).stem
# Convert to text
text = cls._html_to_text(content)
# Find placeholders
placeholders = MarkdownParser._find_placeholders(text)
# Detect language
lang_match = re.search(r'<html[^>]*lang=["\']([a-z]{2})["\']', content, re.IGNORECASE)
language = lang_match.group(1) if lang_match else MarkdownParser._detect_language(text)
return ExtractedDocument(
text=text,
title=title,
file_path=filename,
file_type="html",
source_url="",
placeholders=placeholders,
language=language,
)
@classmethod
def _html_to_text(cls, html: str) -> str:
"""Convert HTML to clean text."""
# Remove script and style tags
html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL | re.IGNORECASE)
html = re.sub(r'<style[^>]*>.*?</style>', '', html, flags=re.DOTALL | re.IGNORECASE)
# Remove comments
html = re.sub(r'<!--.*?-->', '', html, flags=re.DOTALL)
# Replace common entities
html = html.replace('&nbsp;', ' ')
html = html.replace('&amp;', '&')
html = html.replace('&lt;', '<')
html = html.replace('&gt;', '>')
html = html.replace('&quot;', '"')
html = html.replace('&apos;', "'")
# Add line breaks for block elements
html = re.sub(r'<br\s*/?>', '\n', html, flags=re.IGNORECASE)
html = re.sub(r'</p>', '\n\n', html, flags=re.IGNORECASE)
html = re.sub(r'</div>', '\n', html, flags=re.IGNORECASE)
html = re.sub(r'</h[1-6]>', '\n\n', html, flags=re.IGNORECASE)
html = re.sub(r'</li>', '\n', html, flags=re.IGNORECASE)
# Remove remaining tags
html = re.sub(r'<[^>]+>', '', html)
# Clean whitespace
html = re.sub(r'[ \t]+', ' ', html)
html = re.sub(r'\n[ \t]+', '\n', html)
html = re.sub(r'[ \t]+\n', '\n', html)
html = re.sub(r'\n{3,}', '\n\n', html)
return html.strip()
class JSONParser:
"""Parse JSON files containing legal template data."""
@classmethod
def parse(cls, content: str, filename: str = "") -> List[ExtractedDocument]:
"""Parse JSON content into ExtractedDocuments."""
try:
data = json.loads(content)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse JSON from {filename}: {e}")
return []
documents = []
if isinstance(data, dict):
# Handle different JSON structures
documents.extend(cls._parse_dict(data, filename))
elif isinstance(data, list):
for i, item in enumerate(data):
if isinstance(item, dict):
docs = cls._parse_dict(item, f"{filename}[{i}]")
documents.extend(docs)
return documents
@classmethod
def _parse_dict(cls, data: dict, filename: str) -> List[ExtractedDocument]:
"""Parse a dictionary into documents."""
documents = []
# Look for text content in common keys
text_keys = ['text', 'content', 'body', 'description', 'value']
title_keys = ['title', 'name', 'heading', 'label', 'key']
# Try to find main text content
text = ""
for key in text_keys:
if key in data and isinstance(data[key], str):
text = data[key]
break
if not text:
# Check for nested structures (like webflorist format)
for key, value in data.items():
if isinstance(value, dict):
nested_docs = cls._parse_dict(value, f"{filename}.{key}")
documents.extend(nested_docs)
elif isinstance(value, list):
for i, item in enumerate(value):
if isinstance(item, dict):
nested_docs = cls._parse_dict(item, f"{filename}.{key}[{i}]")
documents.extend(nested_docs)
elif isinstance(item, str) and len(item) > 50:
# Treat long strings as content
documents.append(ExtractedDocument(
text=item,
title=f"{key} {i+1}",
file_path=filename,
file_type="json",
source_url="",
language=MarkdownParser._detect_language(item),
))
return documents
# Found text content
title = ""
for key in title_keys:
if key in data and isinstance(data[key], str):
title = data[key]
break
if not title:
title = Path(filename).stem
# Extract metadata
metadata = {}
for key, value in data.items():
if key not in text_keys + title_keys and not isinstance(value, (dict, list)):
metadata[key] = value
placeholders = MarkdownParser._find_placeholders(text)
language = data.get('lang', data.get('language', MarkdownParser._detect_language(text)))
documents.append(ExtractedDocument(
text=text,
title=title,
file_path=filename,
file_type="json",
source_url="",
placeholders=placeholders,
language=language,
metadata=metadata,
))
return documents
class GitHubCrawler:
"""Crawl GitHub repositories for legal templates."""
def __init__(self, token: Optional[str] = None):
self.token = token or GITHUB_TOKEN
self.headers = {
"Accept": "application/vnd.github.v3+json",
"User-Agent": "LegalTemplatesCrawler/1.0",
}
if self.token:
self.headers["Authorization"] = f"token {self.token}"
self.http_client: Optional[httpx.AsyncClient] = None
async def __aenter__(self):
self.http_client = httpx.AsyncClient(
timeout=REQUEST_TIMEOUT,
headers=self.headers,
follow_redirects=True,
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.http_client:
await self.http_client.aclose()
def _parse_repo_url(self, url: str) -> Tuple[str, str, str]:
"""Parse repository URL into owner, repo, and host."""
parsed = urlparse(url)
path_parts = parsed.path.strip('/').split('/')
if len(path_parts) < 2:
raise ValueError(f"Invalid repository URL: {url}")
owner = path_parts[0]
repo = path_parts[1].replace('.git', '')
if 'gitlab' in parsed.netloc:
host = 'gitlab'
else:
host = 'github'
return owner, repo, host
async def get_default_branch(self, owner: str, repo: str) -> str:
"""Get the default branch of a repository."""
if not self.http_client:
raise RuntimeError("Crawler not initialized. Use 'async with' context.")
url = f"{GITHUB_API_URL}/repos/{owner}/{repo}"
response = await self.http_client.get(url)
response.raise_for_status()
data = response.json()
return data.get("default_branch", "main")
async def get_latest_commit(self, owner: str, repo: str, branch: str = "main") -> str:
"""Get the latest commit SHA for a branch."""
if not self.http_client:
raise RuntimeError("Crawler not initialized. Use 'async with' context.")
url = f"{GITHUB_API_URL}/repos/{owner}/{repo}/commits/{branch}"
response = await self.http_client.get(url)
response.raise_for_status()
data = response.json()
return data.get("sha", "")
async def list_files(
self,
owner: str,
repo: str,
path: str = "",
branch: str = "main",
patterns: List[str] = None,
exclude_patterns: List[str] = None,
) -> List[Dict[str, Any]]:
"""List files in a repository matching the given patterns."""
if not self.http_client:
raise RuntimeError("Crawler not initialized. Use 'async with' context.")
patterns = patterns or ["*.md", "*.txt", "*.html"]
exclude_patterns = exclude_patterns or []
url = f"{GITHUB_API_URL}/repos/{owner}/{repo}/git/trees/{branch}?recursive=1"
response = await self.http_client.get(url)
response.raise_for_status()
data = response.json()
files = []
for item in data.get("tree", []):
if item["type"] != "blob":
continue
file_path = item["path"]
# Check exclude patterns
excluded = any(fnmatch(file_path, pattern) for pattern in exclude_patterns)
if excluded:
continue
# Check include patterns
matched = any(fnmatch(file_path, pattern) for pattern in patterns)
if not matched:
continue
# Skip large files
if item.get("size", 0) > MAX_FILE_SIZE:
logger.warning(f"Skipping large file: {file_path} ({item['size']} bytes)")
continue
files.append({
"path": file_path,
"sha": item["sha"],
"size": item.get("size", 0),
"url": item.get("url", ""),
})
return files
async def get_file_content(self, owner: str, repo: str, path: str, branch: str = "main") -> str:
"""Get the content of a file from a repository."""
if not self.http_client:
raise RuntimeError("Crawler not initialized. Use 'async with' context.")
# Use raw content URL for simplicity
url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{path}"
response = await self.http_client.get(url)
response.raise_for_status()
return response.text
async def crawl_repository(
self,
source: SourceConfig,
) -> AsyncGenerator[ExtractedDocument, None]:
"""Crawl a repository and yield extracted documents."""
if not source.repo_url:
logger.warning(f"No repo URL for source: {source.name}")
return
try:
owner, repo, host = self._parse_repo_url(source.repo_url)
except ValueError as e:
logger.error(f"Failed to parse repo URL for {source.name}: {e}")
return
if host == "gitlab":
logger.info(f"GitLab repos not yet supported: {source.name}")
return
logger.info(f"Crawling repository: {owner}/{repo}")
try:
# Get default branch and latest commit
branch = await self.get_default_branch(owner, repo)
commit_sha = await self.get_latest_commit(owner, repo, branch)
await asyncio.sleep(RATE_LIMIT_DELAY)
# List files matching patterns
files = await self.list_files(
owner, repo,
branch=branch,
patterns=source.file_patterns,
exclude_patterns=source.exclude_patterns,
)
logger.info(f"Found {len(files)} matching files in {source.name}")
for file_info in files:
await asyncio.sleep(RATE_LIMIT_DELAY)
try:
content = await self.get_file_content(
owner, repo, file_info["path"], branch
)
# Parse based on file type
file_path = file_info["path"]
source_url = f"https://github.com/{owner}/{repo}/blob/{branch}/{file_path}"
if file_path.endswith('.md'):
doc = MarkdownParser.parse(content, file_path)
doc.source_url = source_url
doc.source_commit = commit_sha
yield doc
elif file_path.endswith('.html') or file_path.endswith('.htm'):
doc = HTMLParser.parse(content, file_path)
doc.source_url = source_url
doc.source_commit = commit_sha
yield doc
elif file_path.endswith('.json'):
docs = JSONParser.parse(content, file_path)
for doc in docs:
doc.source_url = source_url
doc.source_commit = commit_sha
yield doc
elif file_path.endswith('.txt'):
# Plain text file
yield ExtractedDocument(
text=content,
title=Path(file_path).stem,
file_path=file_path,
file_type="text",
source_url=source_url,
source_commit=commit_sha,
language=MarkdownParser._detect_language(content),
placeholders=MarkdownParser._find_placeholders(content),
)
except httpx.HTTPError as e:
logger.warning(f"Failed to fetch {file_path}: {e}")
continue
except Exception as e:
logger.error(f"Error processing {file_path}: {e}")
continue
except httpx.HTTPError as e:
logger.error(f"HTTP error crawling {source.name}: {e}")
except Exception as e:
logger.error(f"Error crawling {source.name}: {e}")
class RepositoryDownloader:
"""Download and extract repository archives."""
def __init__(self):
self.http_client: Optional[httpx.AsyncClient] = None
async def __aenter__(self):
self.http_client = httpx.AsyncClient(
timeout=120.0,
follow_redirects=True,
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.http_client:
await self.http_client.aclose()
async def download_zip(self, repo_url: str, branch: str = "main") -> Path:
"""Download repository as ZIP and extract to temp directory."""
if not self.http_client:
raise RuntimeError("Downloader not initialized. Use 'async with' context.")
parsed = urlparse(repo_url)
path_parts = parsed.path.strip('/').split('/')
owner = path_parts[0]
repo = path_parts[1].replace('.git', '')
zip_url = f"https://github.com/{owner}/{repo}/archive/refs/heads/{branch}.zip"
logger.info(f"Downloading ZIP from {zip_url}")
response = await self.http_client.get(zip_url)
response.raise_for_status()
# Save to temp file
temp_dir = Path(tempfile.mkdtemp())
zip_path = temp_dir / f"{repo}.zip"
with open(zip_path, 'wb') as f:
f.write(response.content)
# Extract ZIP
extract_dir = temp_dir / repo
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
# The extracted directory is usually named repo-branch
extracted_dirs = list(temp_dir.glob(f"{repo}-*"))
if extracted_dirs:
return extracted_dirs[0]
return extract_dir
async def crawl_local_directory(
self,
directory: Path,
source: SourceConfig,
base_url: str,
) -> AsyncGenerator[ExtractedDocument, None]:
"""Crawl a local directory for documents."""
patterns = source.file_patterns or ["*.md", "*.txt", "*.html"]
exclude_patterns = source.exclude_patterns or []
for pattern in patterns:
for file_path in directory.rglob(pattern.replace("**/", "")):
if not file_path.is_file():
continue
rel_path = str(file_path.relative_to(directory))
# Check exclude patterns
excluded = any(fnmatch(rel_path, ep) for ep in exclude_patterns)
if excluded:
continue
# Skip large files
if file_path.stat().st_size > MAX_FILE_SIZE:
continue
try:
content = file_path.read_text(encoding='utf-8')
except UnicodeDecodeError:
try:
content = file_path.read_text(encoding='latin-1')
except Exception:
continue
source_url = f"{base_url}/{rel_path}"
if file_path.suffix == '.md':
doc = MarkdownParser.parse(content, rel_path)
doc.source_url = source_url
yield doc
elif file_path.suffix in ['.html', '.htm']:
doc = HTMLParser.parse(content, rel_path)
doc.source_url = source_url
yield doc
elif file_path.suffix == '.json':
docs = JSONParser.parse(content, rel_path)
for doc in docs:
doc.source_url = source_url
yield doc
elif file_path.suffix == '.txt':
yield ExtractedDocument(
text=content,
title=file_path.stem,
file_path=rel_path,
file_type="text",
source_url=source_url,
language=MarkdownParser._detect_language(content),
placeholders=MarkdownParser._find_placeholders(content),
)
def cleanup(self, directory: Path):
"""Clean up temporary directory."""
if directory.exists():
shutil.rmtree(directory, ignore_errors=True)
async def crawl_source(source: SourceConfig) -> List[ExtractedDocument]:
"""Crawl a source configuration and return all extracted documents."""
documents = []
if source.repo_url:
async with GitHubCrawler() as crawler:
async for doc in crawler.crawl_repository(source):
documents.append(doc)
return documents
# CLI for testing
async def main():
"""Test crawler with a sample source."""
from template_sources import TEMPLATE_SOURCES
# Test with github-site-policy
source = next(s for s in TEMPLATE_SOURCES if s.name == "github-site-policy")
async with GitHubCrawler() as crawler:
count = 0
async for doc in crawler.crawl_repository(source):
count += 1
print(f"\n{'='*60}")
print(f"Title: {doc.title}")
print(f"Path: {doc.file_path}")
print(f"Type: {doc.file_type}")
print(f"Language: {doc.language}")
print(f"URL: {doc.source_url}")
print(f"Placeholders: {doc.placeholders[:5] if doc.placeholders else 'None'}")
print(f"Text preview: {doc.text[:200]}...")
print(f"\n\nTotal documents: {count}")
if __name__ == "__main__":
asyncio.run(main())