A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.
This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).
Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
187 lines
5.0 KiB
Python
187 lines
5.0 KiB
Python
"""
|
|
Integration tests for database and cache connectivity.
|
|
|
|
These tests verify that the CI pipeline can connect to:
|
|
- PostgreSQL database
|
|
- Valkey/Redis cache
|
|
|
|
Run with: pytest tests/test_integration/test_db_connection.py -v
|
|
"""
|
|
|
|
import os
|
|
import pytest
|
|
|
|
|
|
@pytest.mark.integration
|
|
def test_database_connection():
|
|
"""Test that we can connect to PostgreSQL."""
|
|
import psycopg2
|
|
|
|
db_url = os.environ.get("DATABASE_URL")
|
|
assert db_url is not None, "DATABASE_URL not set"
|
|
|
|
# Parse connection parameters from URL
|
|
# Format: postgresql://user:password@host:port/dbname
|
|
conn = psycopg2.connect(db_url)
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT 1")
|
|
result = cur.fetchone()
|
|
assert result[0] == 1, "Database query returned unexpected result"
|
|
|
|
# Test database version
|
|
cur.execute("SELECT version()")
|
|
version = cur.fetchone()[0]
|
|
assert "PostgreSQL" in version, f"Unexpected database: {version}"
|
|
print(f"Connected to: {version.split(',')[0]}")
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
@pytest.mark.integration
|
|
def test_database_can_create_table():
|
|
"""Test that we can create and drop tables."""
|
|
import psycopg2
|
|
|
|
db_url = os.environ.get("DATABASE_URL")
|
|
assert db_url is not None, "DATABASE_URL not set"
|
|
|
|
conn = psycopg2.connect(db_url)
|
|
conn.autocommit = True
|
|
try:
|
|
cur = conn.cursor()
|
|
|
|
# Create test table
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS _ci_test_table (
|
|
id SERIAL PRIMARY KEY,
|
|
name VARCHAR(100),
|
|
created_at TIMESTAMP DEFAULT NOW()
|
|
)
|
|
""")
|
|
|
|
# Insert test data
|
|
cur.execute(
|
|
"INSERT INTO _ci_test_table (name) VALUES (%s) RETURNING id",
|
|
("integration_test",)
|
|
)
|
|
inserted_id = cur.fetchone()[0]
|
|
assert inserted_id is not None, "Insert failed"
|
|
|
|
# Read back
|
|
cur.execute("SELECT name FROM _ci_test_table WHERE id = %s", (inserted_id,))
|
|
name = cur.fetchone()[0]
|
|
assert name == "integration_test", f"Read back failed: {name}"
|
|
|
|
# Cleanup
|
|
cur.execute("DROP TABLE IF EXISTS _ci_test_table")
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
@pytest.mark.integration
|
|
def test_valkey_connection():
|
|
"""Test that we can connect to Valkey/Redis."""
|
|
import redis
|
|
|
|
valkey_url = os.environ.get("VALKEY_URL") or os.environ.get("REDIS_URL")
|
|
assert valkey_url is not None, "VALKEY_URL or REDIS_URL not set"
|
|
|
|
r = redis.from_url(valkey_url)
|
|
try:
|
|
# Test ping
|
|
assert r.ping() is True, "Valkey ping failed"
|
|
|
|
# Test set/get
|
|
test_key = "_ci_test_key"
|
|
test_value = "integration_test_value"
|
|
|
|
r.set(test_key, test_value)
|
|
result = r.get(test_key)
|
|
assert result == test_value.encode(), f"Get returned: {result}"
|
|
|
|
# Cleanup
|
|
r.delete(test_key)
|
|
assert r.get(test_key) is None, "Delete failed"
|
|
|
|
# Get server info
|
|
info = r.info("server")
|
|
server_version = info.get("redis_version", "unknown")
|
|
print(f"Connected to Valkey/Redis version: {server_version}")
|
|
|
|
finally:
|
|
r.close()
|
|
|
|
|
|
@pytest.mark.integration
|
|
def test_valkey_can_store_json():
|
|
"""Test that Valkey can store and retrieve JSON data."""
|
|
import redis
|
|
import json
|
|
|
|
valkey_url = os.environ.get("VALKEY_URL") or os.environ.get("REDIS_URL")
|
|
assert valkey_url is not None, "VALKEY_URL or REDIS_URL not set"
|
|
|
|
r = redis.from_url(valkey_url)
|
|
try:
|
|
test_key = "_ci_test_json"
|
|
test_data = {
|
|
"user_id": "test-123",
|
|
"session": {"active": True, "created": "2025-01-01"},
|
|
"scores": [85, 90, 78]
|
|
}
|
|
|
|
# Store as JSON
|
|
r.set(test_key, json.dumps(test_data))
|
|
|
|
# Retrieve and parse
|
|
result = r.get(test_key)
|
|
parsed = json.loads(result)
|
|
|
|
assert parsed["user_id"] == "test-123"
|
|
assert parsed["session"]["active"] is True
|
|
assert parsed["scores"] == [85, 90, 78]
|
|
|
|
# Cleanup
|
|
r.delete(test_key)
|
|
|
|
finally:
|
|
r.close()
|
|
|
|
|
|
@pytest.mark.integration
|
|
def test_valkey_expiration():
|
|
"""Test that Valkey TTL/expiration works."""
|
|
import redis
|
|
import time
|
|
|
|
valkey_url = os.environ.get("VALKEY_URL") or os.environ.get("REDIS_URL")
|
|
assert valkey_url is not None, "VALKEY_URL or REDIS_URL not set"
|
|
|
|
r = redis.from_url(valkey_url)
|
|
try:
|
|
test_key = "_ci_test_expiry"
|
|
|
|
# Set with 2 second TTL
|
|
r.setex(test_key, 2, "temporary_value")
|
|
|
|
# Should exist immediately
|
|
assert r.get(test_key) is not None, "Key should exist"
|
|
|
|
# Check TTL
|
|
ttl = r.ttl(test_key)
|
|
assert 0 < ttl <= 2, f"TTL should be 1-2, got {ttl}"
|
|
|
|
# Wait for expiration
|
|
time.sleep(3)
|
|
|
|
# Should be gone
|
|
assert r.get(test_key) is None, "Key should have expired"
|
|
|
|
finally:
|
|
# Cleanup (in case test failed before expiration)
|
|
r.delete(test_key)
|
|
r.close()
|