Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website, Klausur-Service, School-Service, Voice-Service, Geo-Service, BreakPilot Drive, Agent-Core Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
339 lines
11 KiB
Python
339 lines
11 KiB
Python
"""
|
|
DEM (Digital Elevation Model) Service
|
|
Serves terrain data from Copernicus DEM GLO-30
|
|
"""
|
|
import os
|
|
import math
|
|
from typing import Optional, Tuple
|
|
import struct
|
|
import structlog
|
|
import numpy as np
|
|
from PIL import Image
|
|
from io import BytesIO
|
|
|
|
from config import settings
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
# Germany bounding box
|
|
GERMANY_BOUNDS = {
|
|
"west": 5.87,
|
|
"south": 47.27,
|
|
"east": 15.04,
|
|
"north": 55.06,
|
|
}
|
|
|
|
|
|
def lat_lon_to_tile(lat: float, lon: float, zoom: int) -> Tuple[int, int]:
|
|
"""Convert latitude/longitude to tile coordinates."""
|
|
n = 2 ** zoom
|
|
x = int((lon + 180.0) / 360.0 * n)
|
|
lat_rad = math.radians(lat)
|
|
y = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n)
|
|
return x, y
|
|
|
|
|
|
def tile_to_bounds(z: int, x: int, y: int) -> Tuple[float, float, float, float]:
|
|
"""Convert tile coordinates to bounding box (west, south, east, north)."""
|
|
n = 2 ** z
|
|
west = x / n * 360.0 - 180.0
|
|
east = (x + 1) / n * 360.0 - 180.0
|
|
north_rad = math.atan(math.sinh(math.pi * (1 - 2 * y / n)))
|
|
south_rad = math.atan(math.sinh(math.pi * (1 - 2 * (y + 1) / n)))
|
|
north = math.degrees(north_rad)
|
|
south = math.degrees(south_rad)
|
|
return west, south, east, north
|
|
|
|
|
|
class DEMService:
|
|
"""
|
|
Service for handling Digital Elevation Model data.
|
|
|
|
Uses Copernicus DEM GLO-30 (30m resolution) as the data source.
|
|
Generates terrain tiles in Mapbox Terrain-RGB format for MapLibre/Unity.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.dem_dir = settings.dem_data_dir
|
|
self.tile_size = settings.terrain_tile_size
|
|
self._dem_cache = {} # Cache loaded DEM files
|
|
|
|
def _get_dem_file_path(self, lat: int, lon: int) -> str:
|
|
"""
|
|
Get the path to a Copernicus DEM file for the given coordinates.
|
|
|
|
Copernicus DEM files are named like: Copernicus_DSM_COG_10_N47_00_E008_00_DEM.tif
|
|
"""
|
|
lat_prefix = "N" if lat >= 0 else "S"
|
|
lon_prefix = "E" if lon >= 0 else "W"
|
|
|
|
# Format: N47E008
|
|
filename = f"{lat_prefix}{abs(lat):02d}{lon_prefix}{abs(lon):03d}.tif"
|
|
return os.path.join(self.dem_dir, filename)
|
|
|
|
def _load_dem_tile(self, lat: int, lon: int) -> Optional[np.ndarray]:
|
|
"""Load a DEM tile from disk."""
|
|
cache_key = f"{lat}_{lon}"
|
|
if cache_key in self._dem_cache:
|
|
return self._dem_cache[cache_key]
|
|
|
|
filepath = self._get_dem_file_path(lat, lon)
|
|
|
|
if not os.path.exists(filepath):
|
|
logger.debug("DEM file not found", path=filepath)
|
|
return None
|
|
|
|
try:
|
|
import rasterio
|
|
|
|
with rasterio.open(filepath) as src:
|
|
data = src.read(1) # Read first band
|
|
self._dem_cache[cache_key] = data
|
|
return data
|
|
|
|
except ImportError:
|
|
logger.warning("rasterio not available, using fallback")
|
|
return self._load_dem_fallback(filepath)
|
|
except Exception as e:
|
|
logger.error("Error loading DEM", path=filepath, error=str(e))
|
|
return None
|
|
|
|
def _load_dem_fallback(self, filepath: str) -> Optional[np.ndarray]:
|
|
"""Fallback DEM loader without rasterio (for development)."""
|
|
# In development, return synthetic terrain
|
|
return None
|
|
|
|
async def get_heightmap_tile(self, z: int, x: int, y: int) -> Optional[bytes]:
|
|
"""
|
|
Generate a heightmap tile in Mapbox Terrain-RGB format.
|
|
|
|
The encoding stores elevation as: height = -10000 + ((R * 256 * 256 + G * 256 + B) * 0.1)
|
|
This allows for -10000m to +1677721.6m range with 0.1m precision.
|
|
"""
|
|
# Get tile bounds
|
|
west, south, east, north = tile_to_bounds(z, x, y)
|
|
|
|
# Check if tile is within Germany
|
|
if east < GERMANY_BOUNDS["west"] or west > GERMANY_BOUNDS["east"]:
|
|
return None
|
|
if north < GERMANY_BOUNDS["south"] or south > GERMANY_BOUNDS["north"]:
|
|
return None
|
|
|
|
# Try to load elevation data for this tile
|
|
elevations = await self._get_elevations_for_bounds(west, south, east, north)
|
|
|
|
if elevations is None:
|
|
# No DEM data available - return placeholder or None
|
|
return self._generate_placeholder_heightmap()
|
|
|
|
# Convert to Terrain-RGB format
|
|
return self._encode_terrain_rgb(elevations)
|
|
|
|
async def _get_elevations_for_bounds(
|
|
self, west: float, south: float, east: float, north: float
|
|
) -> Optional[np.ndarray]:
|
|
"""Get elevation data for a bounding box."""
|
|
# Determine which DEM tiles we need
|
|
lat_min = int(math.floor(south))
|
|
lat_max = int(math.ceil(north))
|
|
lon_min = int(math.floor(west))
|
|
lon_max = int(math.ceil(east))
|
|
|
|
# Load all required DEM tiles
|
|
dem_tiles = []
|
|
for lat in range(lat_min, lat_max + 1):
|
|
for lon in range(lon_min, lon_max + 1):
|
|
tile = self._load_dem_tile(lat, lon)
|
|
if tile is not None:
|
|
dem_tiles.append((lat, lon, tile))
|
|
|
|
if not dem_tiles:
|
|
return None
|
|
|
|
# Interpolate elevations for our tile grid
|
|
elevations = np.zeros((self.tile_size, self.tile_size), dtype=np.float32)
|
|
|
|
for py in range(self.tile_size):
|
|
for px in range(self.tile_size):
|
|
# Calculate lat/lon for this pixel
|
|
lon = west + (east - west) * px / self.tile_size
|
|
lat = north - (north - south) * py / self.tile_size
|
|
|
|
# Get elevation (simplified - would need proper interpolation)
|
|
elevation = self._sample_elevation(lat, lon, dem_tiles)
|
|
elevations[py, px] = elevation if elevation is not None else 0
|
|
|
|
return elevations
|
|
|
|
def _sample_elevation(
|
|
self, lat: float, lon: float, dem_tiles: list
|
|
) -> Optional[float]:
|
|
"""Sample elevation at a specific point from loaded DEM tiles."""
|
|
tile_lat = int(math.floor(lat))
|
|
tile_lon = int(math.floor(lon))
|
|
|
|
for t_lat, t_lon, data in dem_tiles:
|
|
if t_lat == tile_lat and t_lon == tile_lon:
|
|
# Calculate pixel position within tile
|
|
# Assuming 1 degree = 3600 pixels (1 arcsecond for 30m DEM)
|
|
rows, cols = data.shape
|
|
px = int((lon - tile_lon) * cols)
|
|
py = int((t_lat + 1 - lat) * rows)
|
|
|
|
px = max(0, min(cols - 1, px))
|
|
py = max(0, min(rows - 1, py))
|
|
|
|
return float(data[py, px])
|
|
|
|
return None
|
|
|
|
def _encode_terrain_rgb(self, elevations: np.ndarray) -> bytes:
|
|
"""
|
|
Encode elevation data as Mapbox Terrain-RGB.
|
|
|
|
Format: height = -10000 + ((R * 256 * 256 + G * 256 + B) * 0.1)
|
|
"""
|
|
# Convert elevation to RGB values
|
|
# encoded = (elevation + 10000) / 0.1
|
|
encoded = ((elevations + 10000) / 0.1).astype(np.uint32)
|
|
|
|
r = (encoded // (256 * 256)) % 256
|
|
g = (encoded // 256) % 256
|
|
b = encoded % 256
|
|
|
|
# Create RGB image
|
|
rgb = np.stack([r, g, b], axis=-1).astype(np.uint8)
|
|
img = Image.fromarray(rgb, mode="RGB")
|
|
|
|
# Save to bytes
|
|
buffer = BytesIO()
|
|
img.save(buffer, format="PNG")
|
|
return buffer.getvalue()
|
|
|
|
def _generate_placeholder_heightmap(self) -> bytes:
|
|
"""Generate a flat placeholder heightmap (sea level)."""
|
|
# Sea level = 0m encoded as RGB
|
|
# encoded = (0 + 10000) / 0.1 = 100000
|
|
# R = 100000 // 65536 = 1
|
|
# G = (100000 // 256) % 256 = 134
|
|
# B = 100000 % 256 = 160
|
|
|
|
img = Image.new("RGB", (self.tile_size, self.tile_size), (1, 134, 160))
|
|
buffer = BytesIO()
|
|
img.save(buffer, format="PNG")
|
|
return buffer.getvalue()
|
|
|
|
async def get_hillshade_tile(
|
|
self, z: int, x: int, y: int, azimuth: float = 315, altitude: float = 45
|
|
) -> Optional[bytes]:
|
|
"""
|
|
Generate a hillshade tile for terrain visualization.
|
|
|
|
Args:
|
|
z, x, y: Tile coordinates
|
|
azimuth: Light direction in degrees (0=N, 90=E, 180=S, 270=W)
|
|
altitude: Light altitude in degrees above horizon
|
|
"""
|
|
west, south, east, north = tile_to_bounds(z, x, y)
|
|
elevations = await self._get_elevations_for_bounds(west, south, east, north)
|
|
|
|
if elevations is None:
|
|
return None
|
|
|
|
# Calculate hillshade using numpy
|
|
hillshade = self._calculate_hillshade(elevations, azimuth, altitude)
|
|
|
|
# Create grayscale image
|
|
img = Image.fromarray((hillshade * 255).astype(np.uint8), mode="L")
|
|
|
|
buffer = BytesIO()
|
|
img.save(buffer, format="PNG")
|
|
return buffer.getvalue()
|
|
|
|
def _calculate_hillshade(
|
|
self, dem: np.ndarray, azimuth: float, altitude: float
|
|
) -> np.ndarray:
|
|
"""Calculate hillshade from DEM array."""
|
|
# Convert angles to radians
|
|
azimuth_rad = math.radians(360 - azimuth + 90)
|
|
altitude_rad = math.radians(altitude)
|
|
|
|
# Calculate gradient
|
|
dy, dx = np.gradient(dem)
|
|
|
|
# Calculate slope and aspect
|
|
slope = np.arctan(np.sqrt(dx**2 + dy**2))
|
|
aspect = np.arctan2(-dy, dx)
|
|
|
|
# Calculate hillshade
|
|
hillshade = (
|
|
np.sin(altitude_rad) * np.cos(slope)
|
|
+ np.cos(altitude_rad) * np.sin(slope) * np.cos(azimuth_rad - aspect)
|
|
)
|
|
|
|
# Normalize to 0-1
|
|
hillshade = np.clip(hillshade, 0, 1)
|
|
|
|
return hillshade
|
|
|
|
async def get_contour_tile(
|
|
self, z: int, x: int, y: int, interval: int = 20
|
|
) -> Optional[bytes]:
|
|
"""Generate contour lines as vector tile (stub - requires more complex implementation)."""
|
|
# This would require generating contour lines from DEM and encoding as MVT
|
|
# For now, return None
|
|
logger.warning("Contour tiles not yet implemented")
|
|
return None
|
|
|
|
async def get_elevation(self, lat: float, lon: float) -> Optional[float]:
|
|
"""Get elevation at a specific point."""
|
|
tile_lat = int(math.floor(lat))
|
|
tile_lon = int(math.floor(lon))
|
|
|
|
dem_data = self._load_dem_tile(tile_lat, tile_lon)
|
|
if dem_data is None:
|
|
return None
|
|
|
|
return self._sample_elevation(lat, lon, [(tile_lat, tile_lon, dem_data)])
|
|
|
|
async def get_elevation_profile(
|
|
self, coordinates: list[list[float]], samples: int = 100
|
|
) -> list[dict]:
|
|
"""Get elevation profile along a path."""
|
|
from shapely.geometry import LineString
|
|
from shapely.ops import substring
|
|
|
|
# Create line from coordinates
|
|
line = LineString(coordinates)
|
|
total_length = line.length
|
|
|
|
# Sample points along line
|
|
profile = []
|
|
for i in range(samples):
|
|
fraction = i / (samples - 1)
|
|
point = line.interpolate(fraction, normalized=True)
|
|
|
|
elevation = await self.get_elevation(point.y, point.x)
|
|
|
|
profile.append({
|
|
"distance_m": fraction * total_length * 111320, # Approximate meters
|
|
"longitude": point.x,
|
|
"latitude": point.y,
|
|
"elevation_m": elevation,
|
|
})
|
|
|
|
return profile
|
|
|
|
async def get_metadata(self) -> dict:
|
|
"""Get metadata about available DEM data."""
|
|
dem_files = []
|
|
if os.path.exists(self.dem_dir):
|
|
dem_files = [f for f in os.listdir(self.dem_dir) if f.endswith(".tif")]
|
|
|
|
return {
|
|
"data_available": len(dem_files) > 0,
|
|
"tiles_generated": len(dem_files),
|
|
"resolution_m": 30,
|
|
"source": "Copernicus DEM GLO-30",
|
|
}
|