Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
All services: admin-v2, studio-v2, website, ai-compliance-sdk, consent-service, klausur-service, voice-service, and infrastructure. Large PDFs and compiled binaries excluded via .gitignore.
768 lines
27 KiB
Python
768 lines
27 KiB
Python
"""
|
|
GitHub Repository Crawler for Legal Templates.
|
|
|
|
Crawls GitHub and GitLab repositories to extract legal template documents
|
|
(Markdown, HTML, JSON, etc.) for ingestion into the RAG system.
|
|
|
|
Features:
|
|
- Clone repositories via Git or download as ZIP
|
|
- Parse Markdown, HTML, JSON, and plain text files
|
|
- Extract structured content with metadata
|
|
- Track git commit hashes for reproducibility
|
|
- Handle rate limiting and errors gracefully
|
|
"""
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import shutil
|
|
import tempfile
|
|
import zipfile
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from fnmatch import fnmatch
|
|
from pathlib import Path
|
|
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple
|
|
from urllib.parse import urlparse
|
|
|
|
import httpx
|
|
|
|
from template_sources import LicenseType, SourceConfig, LICENSES
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration
|
|
GITHUB_API_URL = "https://api.github.com"
|
|
GITLAB_API_URL = "https://gitlab.com/api/v4"
|
|
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN", "") # Optional for higher rate limits
|
|
MAX_FILE_SIZE = 1024 * 1024 # 1 MB max file size
|
|
REQUEST_TIMEOUT = 60.0
|
|
RATE_LIMIT_DELAY = 1.0 # Delay between requests to avoid rate limiting
|
|
|
|
|
|
@dataclass
|
|
class ExtractedDocument:
|
|
"""A document extracted from a repository."""
|
|
text: str
|
|
title: str
|
|
file_path: str
|
|
file_type: str # "markdown", "html", "json", "text"
|
|
source_url: str
|
|
source_commit: Optional[str] = None
|
|
source_hash: str = "" # SHA256 of original content
|
|
sections: List[Dict[str, Any]] = field(default_factory=list)
|
|
placeholders: List[str] = field(default_factory=list)
|
|
language: str = "en"
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
def __post_init__(self):
|
|
if not self.source_hash and self.text:
|
|
self.source_hash = hashlib.sha256(self.text.encode()).hexdigest()
|
|
|
|
|
|
class MarkdownParser:
|
|
"""Parse Markdown files into structured content."""
|
|
|
|
# Common placeholder patterns
|
|
PLACEHOLDER_PATTERNS = [
|
|
r'\[([A-Z_]+)\]', # [COMPANY_NAME]
|
|
r'\{([a-z_]+)\}', # {company_name}
|
|
r'\{\{([a-z_]+)\}\}', # {{company_name}}
|
|
r'__([A-Z_]+)__', # __COMPANY_NAME__
|
|
r'<([A-Z_]+)>', # <COMPANY_NAME>
|
|
]
|
|
|
|
@classmethod
|
|
def parse(cls, content: str, filename: str = "") -> ExtractedDocument:
|
|
"""Parse markdown content into an ExtractedDocument."""
|
|
# Extract title from first heading or filename
|
|
title = cls._extract_title(content, filename)
|
|
|
|
# Extract sections
|
|
sections = cls._extract_sections(content)
|
|
|
|
# Find placeholders
|
|
placeholders = cls._find_placeholders(content)
|
|
|
|
# Detect language
|
|
language = cls._detect_language(content)
|
|
|
|
# Clean content for indexing
|
|
clean_text = cls._clean_for_indexing(content)
|
|
|
|
return ExtractedDocument(
|
|
text=clean_text,
|
|
title=title,
|
|
file_path=filename,
|
|
file_type="markdown",
|
|
source_url="", # Will be set by caller
|
|
sections=sections,
|
|
placeholders=placeholders,
|
|
language=language,
|
|
)
|
|
|
|
@classmethod
|
|
def _extract_title(cls, content: str, filename: str) -> str:
|
|
"""Extract title from markdown heading or filename."""
|
|
# Look for first h1 heading
|
|
h1_match = re.search(r'^#\s+(.+)$', content, re.MULTILINE)
|
|
if h1_match:
|
|
return h1_match.group(1).strip()
|
|
|
|
# Look for YAML frontmatter title
|
|
frontmatter_match = re.search(
|
|
r'^---\s*\n.*?title:\s*["\']?(.+?)["\']?\s*\n.*?---',
|
|
content, re.DOTALL
|
|
)
|
|
if frontmatter_match:
|
|
return frontmatter_match.group(1).strip()
|
|
|
|
# Fall back to filename
|
|
if filename:
|
|
name = Path(filename).stem
|
|
# Convert kebab-case or snake_case to title case
|
|
return name.replace('-', ' ').replace('_', ' ').title()
|
|
|
|
return "Untitled"
|
|
|
|
@classmethod
|
|
def _extract_sections(cls, content: str) -> List[Dict[str, Any]]:
|
|
"""Extract sections from markdown content."""
|
|
sections = []
|
|
current_section = {"heading": "", "level": 0, "content": "", "start": 0}
|
|
|
|
for match in re.finditer(r'^(#{1,6})\s+(.+)$', content, re.MULTILINE):
|
|
# Save previous section if it has content
|
|
if current_section["heading"] or current_section["content"].strip():
|
|
current_section["content"] = current_section["content"].strip()
|
|
sections.append(current_section.copy())
|
|
|
|
# Start new section
|
|
level = len(match.group(1))
|
|
heading = match.group(2).strip()
|
|
current_section = {
|
|
"heading": heading,
|
|
"level": level,
|
|
"content": "",
|
|
"start": match.end(),
|
|
}
|
|
|
|
# Add final section
|
|
if current_section["heading"] or current_section["content"].strip():
|
|
current_section["content"] = content[current_section["start"]:].strip()
|
|
sections.append(current_section)
|
|
|
|
return sections
|
|
|
|
@classmethod
|
|
def _find_placeholders(cls, content: str) -> List[str]:
|
|
"""Find placeholder patterns in content."""
|
|
placeholders = set()
|
|
for pattern in cls.PLACEHOLDER_PATTERNS:
|
|
for match in re.finditer(pattern, content):
|
|
placeholder = match.group(0)
|
|
placeholders.add(placeholder)
|
|
return sorted(list(placeholders))
|
|
|
|
@classmethod
|
|
def _detect_language(cls, content: str) -> str:
|
|
"""Detect language from content."""
|
|
# Look for German-specific words
|
|
german_indicators = [
|
|
'Datenschutz', 'Impressum', 'Nutzungsbedingungen', 'Haftung',
|
|
'Widerruf', 'Verantwortlicher', 'personenbezogene', 'Verarbeitung',
|
|
'und', 'der', 'die', 'das', 'ist', 'wird', 'werden', 'sind',
|
|
]
|
|
|
|
lower_content = content.lower()
|
|
german_count = sum(1 for word in german_indicators if word.lower() in lower_content)
|
|
|
|
if german_count >= 3:
|
|
return "de"
|
|
return "en"
|
|
|
|
@classmethod
|
|
def _clean_for_indexing(cls, content: str) -> str:
|
|
"""Clean markdown content for text indexing."""
|
|
# Remove YAML frontmatter
|
|
content = re.sub(r'^---\s*\n.*?---\s*\n', '', content, flags=re.DOTALL)
|
|
|
|
# Remove HTML comments
|
|
content = re.sub(r'<!--.*?-->', '', content, flags=re.DOTALL)
|
|
|
|
# Remove inline HTML tags but keep content
|
|
content = re.sub(r'<[^>]+>', '', content)
|
|
|
|
# Convert markdown formatting
|
|
content = re.sub(r'\*\*(.+?)\*\*', r'\1', content) # Bold
|
|
content = re.sub(r'\*(.+?)\*', r'\1', content) # Italic
|
|
content = re.sub(r'`(.+?)`', r'\1', content) # Inline code
|
|
content = re.sub(r'~~(.+?)~~', r'\1', content) # Strikethrough
|
|
|
|
# Remove link syntax but keep text
|
|
content = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', content)
|
|
|
|
# Remove image syntax
|
|
content = re.sub(r'!\[([^\]]*)\]\([^)]+\)', r'\1', content)
|
|
|
|
# Clean up whitespace
|
|
content = re.sub(r'\n{3,}', '\n\n', content)
|
|
content = re.sub(r' +', ' ', content)
|
|
|
|
return content.strip()
|
|
|
|
|
|
class HTMLParser:
|
|
"""Parse HTML files into structured content."""
|
|
|
|
@classmethod
|
|
def parse(cls, content: str, filename: str = "") -> ExtractedDocument:
|
|
"""Parse HTML content into an ExtractedDocument."""
|
|
# Extract title
|
|
title_match = re.search(r'<title>(.+?)</title>', content, re.IGNORECASE)
|
|
title = title_match.group(1) if title_match else Path(filename).stem
|
|
|
|
# Convert to text
|
|
text = cls._html_to_text(content)
|
|
|
|
# Find placeholders
|
|
placeholders = MarkdownParser._find_placeholders(text)
|
|
|
|
# Detect language
|
|
lang_match = re.search(r'<html[^>]*lang=["\']([a-z]{2})["\']', content, re.IGNORECASE)
|
|
language = lang_match.group(1) if lang_match else MarkdownParser._detect_language(text)
|
|
|
|
return ExtractedDocument(
|
|
text=text,
|
|
title=title,
|
|
file_path=filename,
|
|
file_type="html",
|
|
source_url="",
|
|
placeholders=placeholders,
|
|
language=language,
|
|
)
|
|
|
|
@classmethod
|
|
def _html_to_text(cls, html: str) -> str:
|
|
"""Convert HTML to clean text."""
|
|
# Remove script and style tags
|
|
html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL | re.IGNORECASE)
|
|
html = re.sub(r'<style[^>]*>.*?</style>', '', html, flags=re.DOTALL | re.IGNORECASE)
|
|
|
|
# Remove comments
|
|
html = re.sub(r'<!--.*?-->', '', html, flags=re.DOTALL)
|
|
|
|
# Replace common entities
|
|
html = html.replace(' ', ' ')
|
|
html = html.replace('&', '&')
|
|
html = html.replace('<', '<')
|
|
html = html.replace('>', '>')
|
|
html = html.replace('"', '"')
|
|
html = html.replace(''', "'")
|
|
|
|
# Add line breaks for block elements
|
|
html = re.sub(r'<br\s*/?>', '\n', html, flags=re.IGNORECASE)
|
|
html = re.sub(r'</p>', '\n\n', html, flags=re.IGNORECASE)
|
|
html = re.sub(r'</div>', '\n', html, flags=re.IGNORECASE)
|
|
html = re.sub(r'</h[1-6]>', '\n\n', html, flags=re.IGNORECASE)
|
|
html = re.sub(r'</li>', '\n', html, flags=re.IGNORECASE)
|
|
|
|
# Remove remaining tags
|
|
html = re.sub(r'<[^>]+>', '', html)
|
|
|
|
# Clean whitespace
|
|
html = re.sub(r'[ \t]+', ' ', html)
|
|
html = re.sub(r'\n[ \t]+', '\n', html)
|
|
html = re.sub(r'[ \t]+\n', '\n', html)
|
|
html = re.sub(r'\n{3,}', '\n\n', html)
|
|
|
|
return html.strip()
|
|
|
|
|
|
class JSONParser:
|
|
"""Parse JSON files containing legal template data."""
|
|
|
|
@classmethod
|
|
def parse(cls, content: str, filename: str = "") -> List[ExtractedDocument]:
|
|
"""Parse JSON content into ExtractedDocuments."""
|
|
try:
|
|
data = json.loads(content)
|
|
except json.JSONDecodeError as e:
|
|
logger.warning(f"Failed to parse JSON from {filename}: {e}")
|
|
return []
|
|
|
|
documents = []
|
|
|
|
if isinstance(data, dict):
|
|
# Handle different JSON structures
|
|
documents.extend(cls._parse_dict(data, filename))
|
|
elif isinstance(data, list):
|
|
for i, item in enumerate(data):
|
|
if isinstance(item, dict):
|
|
docs = cls._parse_dict(item, f"{filename}[{i}]")
|
|
documents.extend(docs)
|
|
|
|
return documents
|
|
|
|
@classmethod
|
|
def _parse_dict(cls, data: dict, filename: str) -> List[ExtractedDocument]:
|
|
"""Parse a dictionary into documents."""
|
|
documents = []
|
|
|
|
# Look for text content in common keys
|
|
text_keys = ['text', 'content', 'body', 'description', 'value']
|
|
title_keys = ['title', 'name', 'heading', 'label', 'key']
|
|
|
|
# Try to find main text content
|
|
text = ""
|
|
for key in text_keys:
|
|
if key in data and isinstance(data[key], str):
|
|
text = data[key]
|
|
break
|
|
|
|
if not text:
|
|
# Check for nested structures (like webflorist format)
|
|
for key, value in data.items():
|
|
if isinstance(value, dict):
|
|
nested_docs = cls._parse_dict(value, f"{filename}.{key}")
|
|
documents.extend(nested_docs)
|
|
elif isinstance(value, list):
|
|
for i, item in enumerate(value):
|
|
if isinstance(item, dict):
|
|
nested_docs = cls._parse_dict(item, f"{filename}.{key}[{i}]")
|
|
documents.extend(nested_docs)
|
|
elif isinstance(item, str) and len(item) > 50:
|
|
# Treat long strings as content
|
|
documents.append(ExtractedDocument(
|
|
text=item,
|
|
title=f"{key} {i+1}",
|
|
file_path=filename,
|
|
file_type="json",
|
|
source_url="",
|
|
language=MarkdownParser._detect_language(item),
|
|
))
|
|
return documents
|
|
|
|
# Found text content
|
|
title = ""
|
|
for key in title_keys:
|
|
if key in data and isinstance(data[key], str):
|
|
title = data[key]
|
|
break
|
|
|
|
if not title:
|
|
title = Path(filename).stem
|
|
|
|
# Extract metadata
|
|
metadata = {}
|
|
for key, value in data.items():
|
|
if key not in text_keys + title_keys and not isinstance(value, (dict, list)):
|
|
metadata[key] = value
|
|
|
|
placeholders = MarkdownParser._find_placeholders(text)
|
|
language = data.get('lang', data.get('language', MarkdownParser._detect_language(text)))
|
|
|
|
documents.append(ExtractedDocument(
|
|
text=text,
|
|
title=title,
|
|
file_path=filename,
|
|
file_type="json",
|
|
source_url="",
|
|
placeholders=placeholders,
|
|
language=language,
|
|
metadata=metadata,
|
|
))
|
|
|
|
return documents
|
|
|
|
|
|
class GitHubCrawler:
|
|
"""Crawl GitHub repositories for legal templates."""
|
|
|
|
def __init__(self, token: Optional[str] = None):
|
|
self.token = token or GITHUB_TOKEN
|
|
self.headers = {
|
|
"Accept": "application/vnd.github.v3+json",
|
|
"User-Agent": "LegalTemplatesCrawler/1.0",
|
|
}
|
|
if self.token:
|
|
self.headers["Authorization"] = f"token {self.token}"
|
|
|
|
self.http_client: Optional[httpx.AsyncClient] = None
|
|
|
|
async def __aenter__(self):
|
|
self.http_client = httpx.AsyncClient(
|
|
timeout=REQUEST_TIMEOUT,
|
|
headers=self.headers,
|
|
follow_redirects=True,
|
|
)
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
if self.http_client:
|
|
await self.http_client.aclose()
|
|
|
|
def _parse_repo_url(self, url: str) -> Tuple[str, str, str]:
|
|
"""Parse repository URL into owner, repo, and host."""
|
|
parsed = urlparse(url)
|
|
path_parts = parsed.path.strip('/').split('/')
|
|
|
|
if len(path_parts) < 2:
|
|
raise ValueError(f"Invalid repository URL: {url}")
|
|
|
|
owner = path_parts[0]
|
|
repo = path_parts[1].replace('.git', '')
|
|
|
|
if 'gitlab' in parsed.netloc:
|
|
host = 'gitlab'
|
|
else:
|
|
host = 'github'
|
|
|
|
return owner, repo, host
|
|
|
|
async def get_default_branch(self, owner: str, repo: str) -> str:
|
|
"""Get the default branch of a repository."""
|
|
if not self.http_client:
|
|
raise RuntimeError("Crawler not initialized. Use 'async with' context.")
|
|
|
|
url = f"{GITHUB_API_URL}/repos/{owner}/{repo}"
|
|
response = await self.http_client.get(url)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return data.get("default_branch", "main")
|
|
|
|
async def get_latest_commit(self, owner: str, repo: str, branch: str = "main") -> str:
|
|
"""Get the latest commit SHA for a branch."""
|
|
if not self.http_client:
|
|
raise RuntimeError("Crawler not initialized. Use 'async with' context.")
|
|
|
|
url = f"{GITHUB_API_URL}/repos/{owner}/{repo}/commits/{branch}"
|
|
response = await self.http_client.get(url)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return data.get("sha", "")
|
|
|
|
async def list_files(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
path: str = "",
|
|
branch: str = "main",
|
|
patterns: List[str] = None,
|
|
exclude_patterns: List[str] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""List files in a repository matching the given patterns."""
|
|
if not self.http_client:
|
|
raise RuntimeError("Crawler not initialized. Use 'async with' context.")
|
|
|
|
patterns = patterns or ["*.md", "*.txt", "*.html"]
|
|
exclude_patterns = exclude_patterns or []
|
|
|
|
url = f"{GITHUB_API_URL}/repos/{owner}/{repo}/git/trees/{branch}?recursive=1"
|
|
response = await self.http_client.get(url)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
files = []
|
|
for item in data.get("tree", []):
|
|
if item["type"] != "blob":
|
|
continue
|
|
|
|
file_path = item["path"]
|
|
|
|
# Check exclude patterns
|
|
excluded = any(fnmatch(file_path, pattern) for pattern in exclude_patterns)
|
|
if excluded:
|
|
continue
|
|
|
|
# Check include patterns
|
|
matched = any(fnmatch(file_path, pattern) for pattern in patterns)
|
|
if not matched:
|
|
continue
|
|
|
|
# Skip large files
|
|
if item.get("size", 0) > MAX_FILE_SIZE:
|
|
logger.warning(f"Skipping large file: {file_path} ({item['size']} bytes)")
|
|
continue
|
|
|
|
files.append({
|
|
"path": file_path,
|
|
"sha": item["sha"],
|
|
"size": item.get("size", 0),
|
|
"url": item.get("url", ""),
|
|
})
|
|
|
|
return files
|
|
|
|
async def get_file_content(self, owner: str, repo: str, path: str, branch: str = "main") -> str:
|
|
"""Get the content of a file from a repository."""
|
|
if not self.http_client:
|
|
raise RuntimeError("Crawler not initialized. Use 'async with' context.")
|
|
|
|
# Use raw content URL for simplicity
|
|
url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{path}"
|
|
response = await self.http_client.get(url)
|
|
response.raise_for_status()
|
|
return response.text
|
|
|
|
async def crawl_repository(
|
|
self,
|
|
source: SourceConfig,
|
|
) -> AsyncGenerator[ExtractedDocument, None]:
|
|
"""Crawl a repository and yield extracted documents."""
|
|
if not source.repo_url:
|
|
logger.warning(f"No repo URL for source: {source.name}")
|
|
return
|
|
|
|
try:
|
|
owner, repo, host = self._parse_repo_url(source.repo_url)
|
|
except ValueError as e:
|
|
logger.error(f"Failed to parse repo URL for {source.name}: {e}")
|
|
return
|
|
|
|
if host == "gitlab":
|
|
logger.info(f"GitLab repos not yet supported: {source.name}")
|
|
return
|
|
|
|
logger.info(f"Crawling repository: {owner}/{repo}")
|
|
|
|
try:
|
|
# Get default branch and latest commit
|
|
branch = await self.get_default_branch(owner, repo)
|
|
commit_sha = await self.get_latest_commit(owner, repo, branch)
|
|
|
|
await asyncio.sleep(RATE_LIMIT_DELAY)
|
|
|
|
# List files matching patterns
|
|
files = await self.list_files(
|
|
owner, repo,
|
|
branch=branch,
|
|
patterns=source.file_patterns,
|
|
exclude_patterns=source.exclude_patterns,
|
|
)
|
|
|
|
logger.info(f"Found {len(files)} matching files in {source.name}")
|
|
|
|
for file_info in files:
|
|
await asyncio.sleep(RATE_LIMIT_DELAY)
|
|
|
|
try:
|
|
content = await self.get_file_content(
|
|
owner, repo, file_info["path"], branch
|
|
)
|
|
|
|
# Parse based on file type
|
|
file_path = file_info["path"]
|
|
source_url = f"https://github.com/{owner}/{repo}/blob/{branch}/{file_path}"
|
|
|
|
if file_path.endswith('.md'):
|
|
doc = MarkdownParser.parse(content, file_path)
|
|
doc.source_url = source_url
|
|
doc.source_commit = commit_sha
|
|
yield doc
|
|
|
|
elif file_path.endswith('.html') or file_path.endswith('.htm'):
|
|
doc = HTMLParser.parse(content, file_path)
|
|
doc.source_url = source_url
|
|
doc.source_commit = commit_sha
|
|
yield doc
|
|
|
|
elif file_path.endswith('.json'):
|
|
docs = JSONParser.parse(content, file_path)
|
|
for doc in docs:
|
|
doc.source_url = source_url
|
|
doc.source_commit = commit_sha
|
|
yield doc
|
|
|
|
elif file_path.endswith('.txt'):
|
|
# Plain text file
|
|
yield ExtractedDocument(
|
|
text=content,
|
|
title=Path(file_path).stem,
|
|
file_path=file_path,
|
|
file_type="text",
|
|
source_url=source_url,
|
|
source_commit=commit_sha,
|
|
language=MarkdownParser._detect_language(content),
|
|
placeholders=MarkdownParser._find_placeholders(content),
|
|
)
|
|
|
|
except httpx.HTTPError as e:
|
|
logger.warning(f"Failed to fetch {file_path}: {e}")
|
|
continue
|
|
except Exception as e:
|
|
logger.error(f"Error processing {file_path}: {e}")
|
|
continue
|
|
|
|
except httpx.HTTPError as e:
|
|
logger.error(f"HTTP error crawling {source.name}: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Error crawling {source.name}: {e}")
|
|
|
|
|
|
class RepositoryDownloader:
|
|
"""Download and extract repository archives."""
|
|
|
|
def __init__(self):
|
|
self.http_client: Optional[httpx.AsyncClient] = None
|
|
|
|
async def __aenter__(self):
|
|
self.http_client = httpx.AsyncClient(
|
|
timeout=120.0,
|
|
follow_redirects=True,
|
|
)
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
if self.http_client:
|
|
await self.http_client.aclose()
|
|
|
|
async def download_zip(self, repo_url: str, branch: str = "main") -> Path:
|
|
"""Download repository as ZIP and extract to temp directory."""
|
|
if not self.http_client:
|
|
raise RuntimeError("Downloader not initialized. Use 'async with' context.")
|
|
|
|
parsed = urlparse(repo_url)
|
|
path_parts = parsed.path.strip('/').split('/')
|
|
owner = path_parts[0]
|
|
repo = path_parts[1].replace('.git', '')
|
|
|
|
zip_url = f"https://github.com/{owner}/{repo}/archive/refs/heads/{branch}.zip"
|
|
|
|
logger.info(f"Downloading ZIP from {zip_url}")
|
|
|
|
response = await self.http_client.get(zip_url)
|
|
response.raise_for_status()
|
|
|
|
# Save to temp file
|
|
temp_dir = Path(tempfile.mkdtemp())
|
|
zip_path = temp_dir / f"{repo}.zip"
|
|
|
|
with open(zip_path, 'wb') as f:
|
|
f.write(response.content)
|
|
|
|
# Extract ZIP
|
|
extract_dir = temp_dir / repo
|
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
|
zip_ref.extractall(temp_dir)
|
|
|
|
# The extracted directory is usually named repo-branch
|
|
extracted_dirs = list(temp_dir.glob(f"{repo}-*"))
|
|
if extracted_dirs:
|
|
return extracted_dirs[0]
|
|
|
|
return extract_dir
|
|
|
|
async def crawl_local_directory(
|
|
self,
|
|
directory: Path,
|
|
source: SourceConfig,
|
|
base_url: str,
|
|
) -> AsyncGenerator[ExtractedDocument, None]:
|
|
"""Crawl a local directory for documents."""
|
|
patterns = source.file_patterns or ["*.md", "*.txt", "*.html"]
|
|
exclude_patterns = source.exclude_patterns or []
|
|
|
|
for pattern in patterns:
|
|
for file_path in directory.rglob(pattern.replace("**/", "")):
|
|
if not file_path.is_file():
|
|
continue
|
|
|
|
rel_path = str(file_path.relative_to(directory))
|
|
|
|
# Check exclude patterns
|
|
excluded = any(fnmatch(rel_path, ep) for ep in exclude_patterns)
|
|
if excluded:
|
|
continue
|
|
|
|
# Skip large files
|
|
if file_path.stat().st_size > MAX_FILE_SIZE:
|
|
continue
|
|
|
|
try:
|
|
content = file_path.read_text(encoding='utf-8')
|
|
except UnicodeDecodeError:
|
|
try:
|
|
content = file_path.read_text(encoding='latin-1')
|
|
except Exception:
|
|
continue
|
|
|
|
source_url = f"{base_url}/{rel_path}"
|
|
|
|
if file_path.suffix == '.md':
|
|
doc = MarkdownParser.parse(content, rel_path)
|
|
doc.source_url = source_url
|
|
yield doc
|
|
|
|
elif file_path.suffix in ['.html', '.htm']:
|
|
doc = HTMLParser.parse(content, rel_path)
|
|
doc.source_url = source_url
|
|
yield doc
|
|
|
|
elif file_path.suffix == '.json':
|
|
docs = JSONParser.parse(content, rel_path)
|
|
for doc in docs:
|
|
doc.source_url = source_url
|
|
yield doc
|
|
|
|
elif file_path.suffix == '.txt':
|
|
yield ExtractedDocument(
|
|
text=content,
|
|
title=file_path.stem,
|
|
file_path=rel_path,
|
|
file_type="text",
|
|
source_url=source_url,
|
|
language=MarkdownParser._detect_language(content),
|
|
placeholders=MarkdownParser._find_placeholders(content),
|
|
)
|
|
|
|
def cleanup(self, directory: Path):
|
|
"""Clean up temporary directory."""
|
|
if directory.exists():
|
|
shutil.rmtree(directory, ignore_errors=True)
|
|
|
|
|
|
async def crawl_source(source: SourceConfig) -> List[ExtractedDocument]:
|
|
"""Crawl a source configuration and return all extracted documents."""
|
|
documents = []
|
|
|
|
if source.repo_url:
|
|
async with GitHubCrawler() as crawler:
|
|
async for doc in crawler.crawl_repository(source):
|
|
documents.append(doc)
|
|
|
|
return documents
|
|
|
|
|
|
# CLI for testing
|
|
async def main():
|
|
"""Test crawler with a sample source."""
|
|
from template_sources import TEMPLATE_SOURCES
|
|
|
|
# Test with github-site-policy
|
|
source = next(s for s in TEMPLATE_SOURCES if s.name == "github-site-policy")
|
|
|
|
async with GitHubCrawler() as crawler:
|
|
count = 0
|
|
async for doc in crawler.crawl_repository(source):
|
|
count += 1
|
|
print(f"\n{'='*60}")
|
|
print(f"Title: {doc.title}")
|
|
print(f"Path: {doc.file_path}")
|
|
print(f"Type: {doc.file_type}")
|
|
print(f"Language: {doc.language}")
|
|
print(f"URL: {doc.source_url}")
|
|
print(f"Placeholders: {doc.placeholders[:5] if doc.placeholders else 'None'}")
|
|
print(f"Text preview: {doc.text[:200]}...")
|
|
|
|
print(f"\n\nTotal documents: {count}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|