This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/geo-service/services/dem_service.py
Benjamin Admin 21a844cb8a fix: Restore all files lost during destructive rebase
A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.

This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).

Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 09:51:32 +01:00

339 lines
11 KiB
Python

"""
DEM (Digital Elevation Model) Service
Serves terrain data from Copernicus DEM GLO-30
"""
import os
import math
from typing import Optional, Tuple
import struct
import structlog
import numpy as np
from PIL import Image
from io import BytesIO
from config import settings
logger = structlog.get_logger(__name__)
# Germany bounding box
GERMANY_BOUNDS = {
"west": 5.87,
"south": 47.27,
"east": 15.04,
"north": 55.06,
}
def lat_lon_to_tile(lat: float, lon: float, zoom: int) -> Tuple[int, int]:
"""Convert latitude/longitude to tile coordinates."""
n = 2 ** zoom
x = int((lon + 180.0) / 360.0 * n)
lat_rad = math.radians(lat)
y = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n)
return x, y
def tile_to_bounds(z: int, x: int, y: int) -> Tuple[float, float, float, float]:
"""Convert tile coordinates to bounding box (west, south, east, north)."""
n = 2 ** z
west = x / n * 360.0 - 180.0
east = (x + 1) / n * 360.0 - 180.0
north_rad = math.atan(math.sinh(math.pi * (1 - 2 * y / n)))
south_rad = math.atan(math.sinh(math.pi * (1 - 2 * (y + 1) / n)))
north = math.degrees(north_rad)
south = math.degrees(south_rad)
return west, south, east, north
class DEMService:
"""
Service for handling Digital Elevation Model data.
Uses Copernicus DEM GLO-30 (30m resolution) as the data source.
Generates terrain tiles in Mapbox Terrain-RGB format for MapLibre/Unity.
"""
def __init__(self):
self.dem_dir = settings.dem_data_dir
self.tile_size = settings.terrain_tile_size
self._dem_cache = {} # Cache loaded DEM files
def _get_dem_file_path(self, lat: int, lon: int) -> str:
"""
Get the path to a Copernicus DEM file for the given coordinates.
Copernicus DEM files are named like: Copernicus_DSM_COG_10_N47_00_E008_00_DEM.tif
"""
lat_prefix = "N" if lat >= 0 else "S"
lon_prefix = "E" if lon >= 0 else "W"
# Format: N47E008
filename = f"{lat_prefix}{abs(lat):02d}{lon_prefix}{abs(lon):03d}.tif"
return os.path.join(self.dem_dir, filename)
def _load_dem_tile(self, lat: int, lon: int) -> Optional[np.ndarray]:
"""Load a DEM tile from disk."""
cache_key = f"{lat}_{lon}"
if cache_key in self._dem_cache:
return self._dem_cache[cache_key]
filepath = self._get_dem_file_path(lat, lon)
if not os.path.exists(filepath):
logger.debug("DEM file not found", path=filepath)
return None
try:
import rasterio
with rasterio.open(filepath) as src:
data = src.read(1) # Read first band
self._dem_cache[cache_key] = data
return data
except ImportError:
logger.warning("rasterio not available, using fallback")
return self._load_dem_fallback(filepath)
except Exception as e:
logger.error("Error loading DEM", path=filepath, error=str(e))
return None
def _load_dem_fallback(self, filepath: str) -> Optional[np.ndarray]:
"""Fallback DEM loader without rasterio (for development)."""
# In development, return synthetic terrain
return None
async def get_heightmap_tile(self, z: int, x: int, y: int) -> Optional[bytes]:
"""
Generate a heightmap tile in Mapbox Terrain-RGB format.
The encoding stores elevation as: height = -10000 + ((R * 256 * 256 + G * 256 + B) * 0.1)
This allows for -10000m to +1677721.6m range with 0.1m precision.
"""
# Get tile bounds
west, south, east, north = tile_to_bounds(z, x, y)
# Check if tile is within Germany
if east < GERMANY_BOUNDS["west"] or west > GERMANY_BOUNDS["east"]:
return None
if north < GERMANY_BOUNDS["south"] or south > GERMANY_BOUNDS["north"]:
return None
# Try to load elevation data for this tile
elevations = await self._get_elevations_for_bounds(west, south, east, north)
if elevations is None:
# No DEM data available - return placeholder or None
return self._generate_placeholder_heightmap()
# Convert to Terrain-RGB format
return self._encode_terrain_rgb(elevations)
async def _get_elevations_for_bounds(
self, west: float, south: float, east: float, north: float
) -> Optional[np.ndarray]:
"""Get elevation data for a bounding box."""
# Determine which DEM tiles we need
lat_min = int(math.floor(south))
lat_max = int(math.ceil(north))
lon_min = int(math.floor(west))
lon_max = int(math.ceil(east))
# Load all required DEM tiles
dem_tiles = []
for lat in range(lat_min, lat_max + 1):
for lon in range(lon_min, lon_max + 1):
tile = self._load_dem_tile(lat, lon)
if tile is not None:
dem_tiles.append((lat, lon, tile))
if not dem_tiles:
return None
# Interpolate elevations for our tile grid
elevations = np.zeros((self.tile_size, self.tile_size), dtype=np.float32)
for py in range(self.tile_size):
for px in range(self.tile_size):
# Calculate lat/lon for this pixel
lon = west + (east - west) * px / self.tile_size
lat = north - (north - south) * py / self.tile_size
# Get elevation (simplified - would need proper interpolation)
elevation = self._sample_elevation(lat, lon, dem_tiles)
elevations[py, px] = elevation if elevation is not None else 0
return elevations
def _sample_elevation(
self, lat: float, lon: float, dem_tiles: list
) -> Optional[float]:
"""Sample elevation at a specific point from loaded DEM tiles."""
tile_lat = int(math.floor(lat))
tile_lon = int(math.floor(lon))
for t_lat, t_lon, data in dem_tiles:
if t_lat == tile_lat and t_lon == tile_lon:
# Calculate pixel position within tile
# Assuming 1 degree = 3600 pixels (1 arcsecond for 30m DEM)
rows, cols = data.shape
px = int((lon - tile_lon) * cols)
py = int((t_lat + 1 - lat) * rows)
px = max(0, min(cols - 1, px))
py = max(0, min(rows - 1, py))
return float(data[py, px])
return None
def _encode_terrain_rgb(self, elevations: np.ndarray) -> bytes:
"""
Encode elevation data as Mapbox Terrain-RGB.
Format: height = -10000 + ((R * 256 * 256 + G * 256 + B) * 0.1)
"""
# Convert elevation to RGB values
# encoded = (elevation + 10000) / 0.1
encoded = ((elevations + 10000) / 0.1).astype(np.uint32)
r = (encoded // (256 * 256)) % 256
g = (encoded // 256) % 256
b = encoded % 256
# Create RGB image
rgb = np.stack([r, g, b], axis=-1).astype(np.uint8)
img = Image.fromarray(rgb, mode="RGB")
# Save to bytes
buffer = BytesIO()
img.save(buffer, format="PNG")
return buffer.getvalue()
def _generate_placeholder_heightmap(self) -> bytes:
"""Generate a flat placeholder heightmap (sea level)."""
# Sea level = 0m encoded as RGB
# encoded = (0 + 10000) / 0.1 = 100000
# R = 100000 // 65536 = 1
# G = (100000 // 256) % 256 = 134
# B = 100000 % 256 = 160
img = Image.new("RGB", (self.tile_size, self.tile_size), (1, 134, 160))
buffer = BytesIO()
img.save(buffer, format="PNG")
return buffer.getvalue()
async def get_hillshade_tile(
self, z: int, x: int, y: int, azimuth: float = 315, altitude: float = 45
) -> Optional[bytes]:
"""
Generate a hillshade tile for terrain visualization.
Args:
z, x, y: Tile coordinates
azimuth: Light direction in degrees (0=N, 90=E, 180=S, 270=W)
altitude: Light altitude in degrees above horizon
"""
west, south, east, north = tile_to_bounds(z, x, y)
elevations = await self._get_elevations_for_bounds(west, south, east, north)
if elevations is None:
return None
# Calculate hillshade using numpy
hillshade = self._calculate_hillshade(elevations, azimuth, altitude)
# Create grayscale image
img = Image.fromarray((hillshade * 255).astype(np.uint8), mode="L")
buffer = BytesIO()
img.save(buffer, format="PNG")
return buffer.getvalue()
def _calculate_hillshade(
self, dem: np.ndarray, azimuth: float, altitude: float
) -> np.ndarray:
"""Calculate hillshade from DEM array."""
# Convert angles to radians
azimuth_rad = math.radians(360 - azimuth + 90)
altitude_rad = math.radians(altitude)
# Calculate gradient
dy, dx = np.gradient(dem)
# Calculate slope and aspect
slope = np.arctan(np.sqrt(dx**2 + dy**2))
aspect = np.arctan2(-dy, dx)
# Calculate hillshade
hillshade = (
np.sin(altitude_rad) * np.cos(slope)
+ np.cos(altitude_rad) * np.sin(slope) * np.cos(azimuth_rad - aspect)
)
# Normalize to 0-1
hillshade = np.clip(hillshade, 0, 1)
return hillshade
async def get_contour_tile(
self, z: int, x: int, y: int, interval: int = 20
) -> Optional[bytes]:
"""Generate contour lines as vector tile (stub - requires more complex implementation)."""
# This would require generating contour lines from DEM and encoding as MVT
# For now, return None
logger.warning("Contour tiles not yet implemented")
return None
async def get_elevation(self, lat: float, lon: float) -> Optional[float]:
"""Get elevation at a specific point."""
tile_lat = int(math.floor(lat))
tile_lon = int(math.floor(lon))
dem_data = self._load_dem_tile(tile_lat, tile_lon)
if dem_data is None:
return None
return self._sample_elevation(lat, lon, [(tile_lat, tile_lon, dem_data)])
async def get_elevation_profile(
self, coordinates: list[list[float]], samples: int = 100
) -> list[dict]:
"""Get elevation profile along a path."""
from shapely.geometry import LineString
from shapely.ops import substring
# Create line from coordinates
line = LineString(coordinates)
total_length = line.length
# Sample points along line
profile = []
for i in range(samples):
fraction = i / (samples - 1)
point = line.interpolate(fraction, normalized=True)
elevation = await self.get_elevation(point.y, point.x)
profile.append({
"distance_m": fraction * total_length * 111320, # Approximate meters
"longitude": point.x,
"latitude": point.y,
"elevation_m": elevation,
})
return profile
async def get_metadata(self) -> dict:
"""Get metadata about available DEM data."""
dem_files = []
if os.path.exists(self.dem_dir):
dem_files = [f for f in os.listdir(self.dem_dir) if f.endswith(".tif")]
return {
"data_available": len(dem_files) > 0,
"tiles_generated": len(dem_files),
"resolution_m": 30,
"source": "Copernicus DEM GLO-30",
}