"""Local/NAS directory scanning with SHA-256 delta detection.""" import os import hashlib import json from dataclasses import dataclass from datetime import datetime, timezone from config import settings @dataclass class CrawledFile: file_path: str file_name: str file_extension: str file_size_bytes: int file_hash: str relative_path: str class FilesystemCrawler: """Walks a directory tree and discovers document files.""" def __init__( self, base_path: str, file_extensions: list[str] | None = None, max_depth: int = 5, exclude_patterns: list[str] | None = None, ): self.base_path = base_path self.file_extensions = file_extensions or settings.SUPPORTED_EXTENSIONS self.max_depth = max_depth self.exclude_patterns = exclude_patterns or [] def _should_exclude(self, path: str) -> bool: for pattern in self.exclude_patterns: if pattern in path: return True return False def _hash_file(self, path: str) -> str: sha256 = hashlib.sha256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): sha256.update(chunk) return sha256.hexdigest() def crawl(self) -> list[CrawledFile]: """Walk the directory tree and return discovered files.""" results: list[CrawledFile] = [] if not os.path.isdir(self.base_path): return results for root, dirs, files in os.walk(self.base_path): # Compute depth relative to base rel = os.path.relpath(root, self.base_path) depth = 0 if rel == "." else rel.count(os.sep) + 1 if depth >= self.max_depth: dirs.clear() continue if self._should_exclude(root): dirs.clear() continue for fname in files: full_path = os.path.join(root, fname) _, ext = os.path.splitext(fname) ext = ext.lower() if ext not in self.file_extensions: continue if self._should_exclude(full_path): continue try: stat = os.stat(full_path) except OSError: continue if stat.st_size > settings.MAX_FILE_SIZE_BYTES: continue file_hash = self._hash_file(full_path) relative = os.path.relpath(full_path, self.base_path) results.append(CrawledFile( file_path=full_path, file_name=fname, file_extension=ext, file_size_bytes=stat.st_size, file_hash=file_hash, relative_path=relative, )) return results