""" AOI Packager Service Creates Unity-compatible bundles from geographic areas """ import os import json import zipfile import uuid from typing import Optional, Tuple from datetime import datetime import math import structlog from shapely.geometry import shape, Polygon, mapping from shapely.ops import transform import pyproj from config import settings logger = structlog.get_logger(__name__) # Germany bounding box GERMANY_BOUNDS = Polygon([ (5.87, 47.27), (15.04, 47.27), (15.04, 55.06), (5.87, 55.06), (5.87, 47.27), ]) # AOI status storage (in production, use database) _aoi_storage = {} class AOIPackagerService: """ Service for packaging geographic areas for Unity 3D rendering. Creates bundles containing: - Terrain heightmap - OSM features (buildings, roads, water, etc.) - Learning node positions - Attribution information """ def __init__(self): self.bundle_dir = settings.bundle_dir self.max_area_km2 = settings.max_aoi_size_km2 def calculate_area_km2(self, geojson: dict) -> float: """ Calculate the area of a GeoJSON polygon in square kilometers. Uses an equal-area projection for accurate measurement. """ try: geom = shape(geojson) # Transform to equal-area projection (EPSG:3035 for Europe) project = pyproj.Transformer.from_crs( "EPSG:4326", # WGS84 "EPSG:3035", # ETRS89-LAEA always_xy=True, ).transform geom_projected = transform(project, geom) area_m2 = geom_projected.area area_km2 = area_m2 / 1_000_000 return area_km2 except Exception as e: logger.error("Error calculating area", error=str(e)) raise ValueError(f"Invalid polygon geometry: {str(e)}") def is_within_germany(self, geojson: dict) -> bool: """Check if a polygon is within Germany's bounds.""" try: geom = shape(geojson) return GERMANY_BOUNDS.contains(geom) except Exception: return False def validate_polygon(self, geojson: dict) -> Tuple[bool, str]: """ Validate a GeoJSON polygon. Checks: - Valid GeoJSON format - Valid polygon geometry - Not self-intersecting """ try: # Check type if geojson.get("type") != "Polygon": return False, "Geometry must be a Polygon" # Check coordinates coords = geojson.get("coordinates") if not coords or not isinstance(coords, list): return False, "Missing or invalid coordinates" # Parse geometry geom = shape(geojson) # Check validity if not geom.is_valid: return False, "Invalid polygon geometry (possibly self-intersecting)" # Check ring closure outer_ring = coords[0] if outer_ring[0] != outer_ring[-1]: return False, "Polygon ring must be closed" return True, "Valid" except Exception as e: return False, f"Error validating polygon: {str(e)}" def estimate_bundle_size_mb(self, area_km2: float, quality: str) -> float: """Estimate the bundle size based on area and quality.""" # Base size per km² in MB base_sizes = { "low": 10, "medium": 25, "high": 50, } base = base_sizes.get(quality, 25) return round(area_km2 * base, 1) async def process_aoi( self, aoi_id: str, polygon: dict, theme: str, quality: str, ): """ Process an AOI and create the Unity bundle. This runs as a background task. """ logger.info("Processing AOI", aoi_id=aoi_id, theme=theme, quality=quality) # Update status _aoi_storage[aoi_id] = { "status": "processing", "polygon": polygon, "theme": theme, "quality": quality, "area_km2": self.calculate_area_km2(polygon), "created_at": datetime.utcnow().isoformat(), } try: # Create bundle directory bundle_path = os.path.join(self.bundle_dir, aoi_id) os.makedirs(bundle_path, exist_ok=True) # Generate terrain heightmap await self._generate_terrain(aoi_id, polygon, quality) # Extract OSM features await self._extract_osm_features(aoi_id, polygon) # Generate learning node positions await self._generate_learning_positions(aoi_id, polygon, theme) # Create attribution file await self._create_attribution(aoi_id) # Create manifest await self._create_manifest(aoi_id, polygon, theme, quality) # Create ZIP bundle await self._create_zip_bundle(aoi_id) # Update status _aoi_storage[aoi_id]["status"] = "completed" _aoi_storage[aoi_id]["completed_at"] = datetime.utcnow().isoformat() logger.info("AOI processing complete", aoi_id=aoi_id) except Exception as e: logger.error("AOI processing failed", aoi_id=aoi_id, error=str(e)) _aoi_storage[aoi_id]["status"] = "failed" _aoi_storage[aoi_id]["error"] = str(e) async def _generate_terrain(self, aoi_id: str, polygon: dict, quality: str): """Generate terrain heightmap for the AOI.""" from services.dem_service import DEMService dem_service = DEMService() bundle_path = os.path.join(self.bundle_dir, aoi_id) # Get bounding box of polygon geom = shape(polygon) bounds = geom.bounds # (minx, miny, maxx, maxy) # Determine resolution based on quality resolutions = {"low": 64, "medium": 256, "high": 512} resolution = resolutions.get(quality, 256) # Generate heightmap for bounds from services.dem_service import tile_to_bounds # For simplicity, generate a single heightmap image # In production, this would be more sophisticated heightmap_path = os.path.join(bundle_path, "terrain.heightmap.png") # Save bounds info terrain_info = { "bounds": { "west": bounds[0], "south": bounds[1], "east": bounds[2], "north": bounds[3], }, "resolution": resolution, "heightmap_file": "terrain.heightmap.png", "encoding": "terrain-rgb", } with open(os.path.join(bundle_path, "terrain.json"), "w") as f: json.dump(terrain_info, f, indent=2) logger.debug("Terrain generated", aoi_id=aoi_id, resolution=resolution) async def _extract_osm_features(self, aoi_id: str, polygon: dict): """Extract OSM features within the AOI.""" from services.osm_extractor import OSMExtractorService extractor = OSMExtractorService() bundle_path = os.path.join(self.bundle_dir, aoi_id) # Extract features features = await extractor.extract_features(polygon) # Save to file features_path = os.path.join(bundle_path, "osm_features.json") with open(features_path, "w") as f: json.dump(features, f, indent=2) logger.debug("OSM features extracted", aoi_id=aoi_id, count=len(features.get("features", []))) async def _generate_learning_positions(self, aoi_id: str, polygon: dict, theme: str): """Generate suggested positions for learning nodes.""" geom = shape(polygon) bounds = geom.bounds centroid = geom.centroid # Generate positions based on theme # For now, create a grid of potential positions positions = [] # Create a 3x3 grid of positions for i in range(3): for j in range(3): lon = bounds[0] + (bounds[2] - bounds[0]) * (i + 0.5) / 3 lat = bounds[1] + (bounds[3] - bounds[1]) * (j + 0.5) / 3 # Check if point is within polygon from shapely.geometry import Point if geom.contains(Point(lon, lat)): positions.append({ "id": str(uuid.uuid4()), "position": {"longitude": lon, "latitude": lat}, "suggested_theme": theme, "status": "pending", }) bundle_path = os.path.join(self.bundle_dir, aoi_id) positions_path = os.path.join(bundle_path, "learning_positions.json") with open(positions_path, "w") as f: json.dump({"positions": positions}, f, indent=2) logger.debug("Learning positions generated", aoi_id=aoi_id, count=len(positions)) async def _create_attribution(self, aoi_id: str): """Create attribution file with required license notices.""" attribution = { "sources": [ { "name": "OpenStreetMap", "license": "Open Database License (ODbL) v1.0", "url": "https://www.openstreetmap.org/copyright", "attribution": "© OpenStreetMap contributors", "required": True, }, { "name": "Copernicus DEM", "license": "Copernicus Data License", "url": "https://spacedata.copernicus.eu/", "attribution": "© Copernicus Service Information 2024", "required": True, }, ], "generated_at": datetime.utcnow().isoformat(), "notice": "This data must be attributed according to the licenses above when used publicly.", } bundle_path = os.path.join(self.bundle_dir, aoi_id) attribution_path = os.path.join(bundle_path, "attribution.json") with open(attribution_path, "w") as f: json.dump(attribution, f, indent=2) async def _create_manifest(self, aoi_id: str, polygon: dict, theme: str, quality: str): """Create Unity bundle manifest.""" geom = shape(polygon) bounds = geom.bounds centroid = geom.centroid manifest = { "version": "1.0.0", "aoi_id": aoi_id, "created_at": datetime.utcnow().isoformat(), "bounds": { "west": bounds[0], "south": bounds[1], "east": bounds[2], "north": bounds[3], }, "center": { "longitude": centroid.x, "latitude": centroid.y, }, "area_km2": self.calculate_area_km2(polygon), "theme": theme, "quality": quality, "assets": { "terrain": { "file": "terrain.heightmap.png", "config": "terrain.json", }, "osm_features": { "file": "osm_features.json", }, "learning_positions": { "file": "learning_positions.json", }, "attribution": { "file": "attribution.json", }, }, "unity": { "coordinate_system": "Unity (Y-up, left-handed)", "scale": 1.0, # 1 Unity unit = 1 meter "terrain_resolution": {"low": 64, "medium": 256, "high": 512}[quality], }, } bundle_path = os.path.join(self.bundle_dir, aoi_id) manifest_path = os.path.join(bundle_path, "manifest.json") with open(manifest_path, "w") as f: json.dump(manifest, f, indent=2) async def _create_zip_bundle(self, aoi_id: str): """Create ZIP archive of all bundle files.""" bundle_path = os.path.join(self.bundle_dir, aoi_id) zip_path = os.path.join(bundle_path, "bundle.zip") with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: for filename in os.listdir(bundle_path): if filename != "bundle.zip": filepath = os.path.join(bundle_path, filename) zf.write(filepath, filename) logger.debug("Bundle ZIP created", aoi_id=aoi_id, path=zip_path) async def get_aoi_status(self, aoi_id: str) -> Optional[dict]: """Get the status of an AOI.""" return _aoi_storage.get(aoi_id) async def get_manifest(self, aoi_id: str) -> Optional[dict]: """Get the manifest for a completed AOI.""" aoi_data = _aoi_storage.get(aoi_id) if aoi_data is None or aoi_data.get("status") != "completed": return None manifest_path = os.path.join(self.bundle_dir, aoi_id, "manifest.json") if not os.path.exists(manifest_path): return None with open(manifest_path) as f: return json.load(f) async def get_bundle_path(self, aoi_id: str) -> Optional[str]: """Get the path to a completed bundle ZIP.""" aoi_data = _aoi_storage.get(aoi_id) if aoi_data is None or aoi_data.get("status") != "completed": return None zip_path = os.path.join(self.bundle_dir, aoi_id, "bundle.zip") if not os.path.exists(zip_path): return None return zip_path async def delete_aoi(self, aoi_id: str) -> bool: """Delete an AOI and its files.""" if aoi_id not in _aoi_storage: return False import shutil bundle_path = os.path.join(self.bundle_dir, aoi_id) if os.path.exists(bundle_path): shutil.rmtree(bundle_path) del _aoi_storage[aoi_id] return True async def generate_preview(self, aoi_id: str, width: int, height: int) -> Optional[bytes]: """Generate a preview image of the AOI (stub).""" # Would generate a preview combining terrain and OSM features return None