Initial commit: breakpilot-lehrer - Lehrer KI Platform
Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website, Klausur-Service, School-Service, Voice-Service, Geo-Service, BreakPilot Drive, Agent-Core Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
338
geo-service/services/dem_service.py
Normal file
338
geo-service/services/dem_service.py
Normal file
@@ -0,0 +1,338 @@
|
||||
"""
|
||||
DEM (Digital Elevation Model) Service
|
||||
Serves terrain data from Copernicus DEM GLO-30
|
||||
"""
|
||||
import os
|
||||
import math
|
||||
from typing import Optional, Tuple
|
||||
import struct
|
||||
import structlog
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
from io import BytesIO
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
# Germany bounding box
|
||||
GERMANY_BOUNDS = {
|
||||
"west": 5.87,
|
||||
"south": 47.27,
|
||||
"east": 15.04,
|
||||
"north": 55.06,
|
||||
}
|
||||
|
||||
|
||||
def lat_lon_to_tile(lat: float, lon: float, zoom: int) -> Tuple[int, int]:
|
||||
"""Convert latitude/longitude to tile coordinates."""
|
||||
n = 2 ** zoom
|
||||
x = int((lon + 180.0) / 360.0 * n)
|
||||
lat_rad = math.radians(lat)
|
||||
y = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n)
|
||||
return x, y
|
||||
|
||||
|
||||
def tile_to_bounds(z: int, x: int, y: int) -> Tuple[float, float, float, float]:
|
||||
"""Convert tile coordinates to bounding box (west, south, east, north)."""
|
||||
n = 2 ** z
|
||||
west = x / n * 360.0 - 180.0
|
||||
east = (x + 1) / n * 360.0 - 180.0
|
||||
north_rad = math.atan(math.sinh(math.pi * (1 - 2 * y / n)))
|
||||
south_rad = math.atan(math.sinh(math.pi * (1 - 2 * (y + 1) / n)))
|
||||
north = math.degrees(north_rad)
|
||||
south = math.degrees(south_rad)
|
||||
return west, south, east, north
|
||||
|
||||
|
||||
class DEMService:
|
||||
"""
|
||||
Service for handling Digital Elevation Model data.
|
||||
|
||||
Uses Copernicus DEM GLO-30 (30m resolution) as the data source.
|
||||
Generates terrain tiles in Mapbox Terrain-RGB format for MapLibre/Unity.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.dem_dir = settings.dem_data_dir
|
||||
self.tile_size = settings.terrain_tile_size
|
||||
self._dem_cache = {} # Cache loaded DEM files
|
||||
|
||||
def _get_dem_file_path(self, lat: int, lon: int) -> str:
|
||||
"""
|
||||
Get the path to a Copernicus DEM file for the given coordinates.
|
||||
|
||||
Copernicus DEM files are named like: Copernicus_DSM_COG_10_N47_00_E008_00_DEM.tif
|
||||
"""
|
||||
lat_prefix = "N" if lat >= 0 else "S"
|
||||
lon_prefix = "E" if lon >= 0 else "W"
|
||||
|
||||
# Format: N47E008
|
||||
filename = f"{lat_prefix}{abs(lat):02d}{lon_prefix}{abs(lon):03d}.tif"
|
||||
return os.path.join(self.dem_dir, filename)
|
||||
|
||||
def _load_dem_tile(self, lat: int, lon: int) -> Optional[np.ndarray]:
|
||||
"""Load a DEM tile from disk."""
|
||||
cache_key = f"{lat}_{lon}"
|
||||
if cache_key in self._dem_cache:
|
||||
return self._dem_cache[cache_key]
|
||||
|
||||
filepath = self._get_dem_file_path(lat, lon)
|
||||
|
||||
if not os.path.exists(filepath):
|
||||
logger.debug("DEM file not found", path=filepath)
|
||||
return None
|
||||
|
||||
try:
|
||||
import rasterio
|
||||
|
||||
with rasterio.open(filepath) as src:
|
||||
data = src.read(1) # Read first band
|
||||
self._dem_cache[cache_key] = data
|
||||
return data
|
||||
|
||||
except ImportError:
|
||||
logger.warning("rasterio not available, using fallback")
|
||||
return self._load_dem_fallback(filepath)
|
||||
except Exception as e:
|
||||
logger.error("Error loading DEM", path=filepath, error=str(e))
|
||||
return None
|
||||
|
||||
def _load_dem_fallback(self, filepath: str) -> Optional[np.ndarray]:
|
||||
"""Fallback DEM loader without rasterio (for development)."""
|
||||
# In development, return synthetic terrain
|
||||
return None
|
||||
|
||||
async def get_heightmap_tile(self, z: int, x: int, y: int) -> Optional[bytes]:
|
||||
"""
|
||||
Generate a heightmap tile in Mapbox Terrain-RGB format.
|
||||
|
||||
The encoding stores elevation as: height = -10000 + ((R * 256 * 256 + G * 256 + B) * 0.1)
|
||||
This allows for -10000m to +1677721.6m range with 0.1m precision.
|
||||
"""
|
||||
# Get tile bounds
|
||||
west, south, east, north = tile_to_bounds(z, x, y)
|
||||
|
||||
# Check if tile is within Germany
|
||||
if east < GERMANY_BOUNDS["west"] or west > GERMANY_BOUNDS["east"]:
|
||||
return None
|
||||
if north < GERMANY_BOUNDS["south"] or south > GERMANY_BOUNDS["north"]:
|
||||
return None
|
||||
|
||||
# Try to load elevation data for this tile
|
||||
elevations = await self._get_elevations_for_bounds(west, south, east, north)
|
||||
|
||||
if elevations is None:
|
||||
# No DEM data available - return placeholder or None
|
||||
return self._generate_placeholder_heightmap()
|
||||
|
||||
# Convert to Terrain-RGB format
|
||||
return self._encode_terrain_rgb(elevations)
|
||||
|
||||
async def _get_elevations_for_bounds(
|
||||
self, west: float, south: float, east: float, north: float
|
||||
) -> Optional[np.ndarray]:
|
||||
"""Get elevation data for a bounding box."""
|
||||
# Determine which DEM tiles we need
|
||||
lat_min = int(math.floor(south))
|
||||
lat_max = int(math.ceil(north))
|
||||
lon_min = int(math.floor(west))
|
||||
lon_max = int(math.ceil(east))
|
||||
|
||||
# Load all required DEM tiles
|
||||
dem_tiles = []
|
||||
for lat in range(lat_min, lat_max + 1):
|
||||
for lon in range(lon_min, lon_max + 1):
|
||||
tile = self._load_dem_tile(lat, lon)
|
||||
if tile is not None:
|
||||
dem_tiles.append((lat, lon, tile))
|
||||
|
||||
if not dem_tiles:
|
||||
return None
|
||||
|
||||
# Interpolate elevations for our tile grid
|
||||
elevations = np.zeros((self.tile_size, self.tile_size), dtype=np.float32)
|
||||
|
||||
for py in range(self.tile_size):
|
||||
for px in range(self.tile_size):
|
||||
# Calculate lat/lon for this pixel
|
||||
lon = west + (east - west) * px / self.tile_size
|
||||
lat = north - (north - south) * py / self.tile_size
|
||||
|
||||
# Get elevation (simplified - would need proper interpolation)
|
||||
elevation = self._sample_elevation(lat, lon, dem_tiles)
|
||||
elevations[py, px] = elevation if elevation is not None else 0
|
||||
|
||||
return elevations
|
||||
|
||||
def _sample_elevation(
|
||||
self, lat: float, lon: float, dem_tiles: list
|
||||
) -> Optional[float]:
|
||||
"""Sample elevation at a specific point from loaded DEM tiles."""
|
||||
tile_lat = int(math.floor(lat))
|
||||
tile_lon = int(math.floor(lon))
|
||||
|
||||
for t_lat, t_lon, data in dem_tiles:
|
||||
if t_lat == tile_lat and t_lon == tile_lon:
|
||||
# Calculate pixel position within tile
|
||||
# Assuming 1 degree = 3600 pixels (1 arcsecond for 30m DEM)
|
||||
rows, cols = data.shape
|
||||
px = int((lon - tile_lon) * cols)
|
||||
py = int((t_lat + 1 - lat) * rows)
|
||||
|
||||
px = max(0, min(cols - 1, px))
|
||||
py = max(0, min(rows - 1, py))
|
||||
|
||||
return float(data[py, px])
|
||||
|
||||
return None
|
||||
|
||||
def _encode_terrain_rgb(self, elevations: np.ndarray) -> bytes:
|
||||
"""
|
||||
Encode elevation data as Mapbox Terrain-RGB.
|
||||
|
||||
Format: height = -10000 + ((R * 256 * 256 + G * 256 + B) * 0.1)
|
||||
"""
|
||||
# Convert elevation to RGB values
|
||||
# encoded = (elevation + 10000) / 0.1
|
||||
encoded = ((elevations + 10000) / 0.1).astype(np.uint32)
|
||||
|
||||
r = (encoded // (256 * 256)) % 256
|
||||
g = (encoded // 256) % 256
|
||||
b = encoded % 256
|
||||
|
||||
# Create RGB image
|
||||
rgb = np.stack([r, g, b], axis=-1).astype(np.uint8)
|
||||
img = Image.fromarray(rgb, mode="RGB")
|
||||
|
||||
# Save to bytes
|
||||
buffer = BytesIO()
|
||||
img.save(buffer, format="PNG")
|
||||
return buffer.getvalue()
|
||||
|
||||
def _generate_placeholder_heightmap(self) -> bytes:
|
||||
"""Generate a flat placeholder heightmap (sea level)."""
|
||||
# Sea level = 0m encoded as RGB
|
||||
# encoded = (0 + 10000) / 0.1 = 100000
|
||||
# R = 100000 // 65536 = 1
|
||||
# G = (100000 // 256) % 256 = 134
|
||||
# B = 100000 % 256 = 160
|
||||
|
||||
img = Image.new("RGB", (self.tile_size, self.tile_size), (1, 134, 160))
|
||||
buffer = BytesIO()
|
||||
img.save(buffer, format="PNG")
|
||||
return buffer.getvalue()
|
||||
|
||||
async def get_hillshade_tile(
|
||||
self, z: int, x: int, y: int, azimuth: float = 315, altitude: float = 45
|
||||
) -> Optional[bytes]:
|
||||
"""
|
||||
Generate a hillshade tile for terrain visualization.
|
||||
|
||||
Args:
|
||||
z, x, y: Tile coordinates
|
||||
azimuth: Light direction in degrees (0=N, 90=E, 180=S, 270=W)
|
||||
altitude: Light altitude in degrees above horizon
|
||||
"""
|
||||
west, south, east, north = tile_to_bounds(z, x, y)
|
||||
elevations = await self._get_elevations_for_bounds(west, south, east, north)
|
||||
|
||||
if elevations is None:
|
||||
return None
|
||||
|
||||
# Calculate hillshade using numpy
|
||||
hillshade = self._calculate_hillshade(elevations, azimuth, altitude)
|
||||
|
||||
# Create grayscale image
|
||||
img = Image.fromarray((hillshade * 255).astype(np.uint8), mode="L")
|
||||
|
||||
buffer = BytesIO()
|
||||
img.save(buffer, format="PNG")
|
||||
return buffer.getvalue()
|
||||
|
||||
def _calculate_hillshade(
|
||||
self, dem: np.ndarray, azimuth: float, altitude: float
|
||||
) -> np.ndarray:
|
||||
"""Calculate hillshade from DEM array."""
|
||||
# Convert angles to radians
|
||||
azimuth_rad = math.radians(360 - azimuth + 90)
|
||||
altitude_rad = math.radians(altitude)
|
||||
|
||||
# Calculate gradient
|
||||
dy, dx = np.gradient(dem)
|
||||
|
||||
# Calculate slope and aspect
|
||||
slope = np.arctan(np.sqrt(dx**2 + dy**2))
|
||||
aspect = np.arctan2(-dy, dx)
|
||||
|
||||
# Calculate hillshade
|
||||
hillshade = (
|
||||
np.sin(altitude_rad) * np.cos(slope)
|
||||
+ np.cos(altitude_rad) * np.sin(slope) * np.cos(azimuth_rad - aspect)
|
||||
)
|
||||
|
||||
# Normalize to 0-1
|
||||
hillshade = np.clip(hillshade, 0, 1)
|
||||
|
||||
return hillshade
|
||||
|
||||
async def get_contour_tile(
|
||||
self, z: int, x: int, y: int, interval: int = 20
|
||||
) -> Optional[bytes]:
|
||||
"""Generate contour lines as vector tile (stub - requires more complex implementation)."""
|
||||
# This would require generating contour lines from DEM and encoding as MVT
|
||||
# For now, return None
|
||||
logger.warning("Contour tiles not yet implemented")
|
||||
return None
|
||||
|
||||
async def get_elevation(self, lat: float, lon: float) -> Optional[float]:
|
||||
"""Get elevation at a specific point."""
|
||||
tile_lat = int(math.floor(lat))
|
||||
tile_lon = int(math.floor(lon))
|
||||
|
||||
dem_data = self._load_dem_tile(tile_lat, tile_lon)
|
||||
if dem_data is None:
|
||||
return None
|
||||
|
||||
return self._sample_elevation(lat, lon, [(tile_lat, tile_lon, dem_data)])
|
||||
|
||||
async def get_elevation_profile(
|
||||
self, coordinates: list[list[float]], samples: int = 100
|
||||
) -> list[dict]:
|
||||
"""Get elevation profile along a path."""
|
||||
from shapely.geometry import LineString
|
||||
from shapely.ops import substring
|
||||
|
||||
# Create line from coordinates
|
||||
line = LineString(coordinates)
|
||||
total_length = line.length
|
||||
|
||||
# Sample points along line
|
||||
profile = []
|
||||
for i in range(samples):
|
||||
fraction = i / (samples - 1)
|
||||
point = line.interpolate(fraction, normalized=True)
|
||||
|
||||
elevation = await self.get_elevation(point.y, point.x)
|
||||
|
||||
profile.append({
|
||||
"distance_m": fraction * total_length * 111320, # Approximate meters
|
||||
"longitude": point.x,
|
||||
"latitude": point.y,
|
||||
"elevation_m": elevation,
|
||||
})
|
||||
|
||||
return profile
|
||||
|
||||
async def get_metadata(self) -> dict:
|
||||
"""Get metadata about available DEM data."""
|
||||
dem_files = []
|
||||
if os.path.exists(self.dem_dir):
|
||||
dem_files = [f for f in os.listdir(self.dem_dir) if f.endswith(".tif")]
|
||||
|
||||
return {
|
||||
"data_available": len(dem_files) > 0,
|
||||
"tiles_generated": len(dem_files),
|
||||
"resolution_m": 30,
|
||||
"source": "Copernicus DEM GLO-30",
|
||||
}
|
||||
Reference in New Issue
Block a user