This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/backend/tests/test_integration/test_db_connection.py
Benjamin Admin bfdaf63ba9 fix: Restore all files lost during destructive rebase
A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.

This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).

Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 09:51:32 +01:00

187 lines
5.0 KiB
Python

"""
Integration tests for database and cache connectivity.
These tests verify that the CI pipeline can connect to:
- PostgreSQL database
- Valkey/Redis cache
Run with: pytest tests/test_integration/test_db_connection.py -v
"""
import os
import pytest
@pytest.mark.integration
def test_database_connection():
"""Test that we can connect to PostgreSQL."""
import psycopg2
db_url = os.environ.get("DATABASE_URL")
assert db_url is not None, "DATABASE_URL not set"
# Parse connection parameters from URL
# Format: postgresql://user:password@host:port/dbname
conn = psycopg2.connect(db_url)
try:
cur = conn.cursor()
cur.execute("SELECT 1")
result = cur.fetchone()
assert result[0] == 1, "Database query returned unexpected result"
# Test database version
cur.execute("SELECT version()")
version = cur.fetchone()[0]
assert "PostgreSQL" in version, f"Unexpected database: {version}"
print(f"Connected to: {version.split(',')[0]}")
finally:
conn.close()
@pytest.mark.integration
def test_database_can_create_table():
"""Test that we can create and drop tables."""
import psycopg2
db_url = os.environ.get("DATABASE_URL")
assert db_url is not None, "DATABASE_URL not set"
conn = psycopg2.connect(db_url)
conn.autocommit = True
try:
cur = conn.cursor()
# Create test table
cur.execute("""
CREATE TABLE IF NOT EXISTS _ci_test_table (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
created_at TIMESTAMP DEFAULT NOW()
)
""")
# Insert test data
cur.execute(
"INSERT INTO _ci_test_table (name) VALUES (%s) RETURNING id",
("integration_test",)
)
inserted_id = cur.fetchone()[0]
assert inserted_id is not None, "Insert failed"
# Read back
cur.execute("SELECT name FROM _ci_test_table WHERE id = %s", (inserted_id,))
name = cur.fetchone()[0]
assert name == "integration_test", f"Read back failed: {name}"
# Cleanup
cur.execute("DROP TABLE IF EXISTS _ci_test_table")
finally:
conn.close()
@pytest.mark.integration
def test_valkey_connection():
"""Test that we can connect to Valkey/Redis."""
import redis
valkey_url = os.environ.get("VALKEY_URL") or os.environ.get("REDIS_URL")
assert valkey_url is not None, "VALKEY_URL or REDIS_URL not set"
r = redis.from_url(valkey_url)
try:
# Test ping
assert r.ping() is True, "Valkey ping failed"
# Test set/get
test_key = "_ci_test_key"
test_value = "integration_test_value"
r.set(test_key, test_value)
result = r.get(test_key)
assert result == test_value.encode(), f"Get returned: {result}"
# Cleanup
r.delete(test_key)
assert r.get(test_key) is None, "Delete failed"
# Get server info
info = r.info("server")
server_version = info.get("redis_version", "unknown")
print(f"Connected to Valkey/Redis version: {server_version}")
finally:
r.close()
@pytest.mark.integration
def test_valkey_can_store_json():
"""Test that Valkey can store and retrieve JSON data."""
import redis
import json
valkey_url = os.environ.get("VALKEY_URL") or os.environ.get("REDIS_URL")
assert valkey_url is not None, "VALKEY_URL or REDIS_URL not set"
r = redis.from_url(valkey_url)
try:
test_key = "_ci_test_json"
test_data = {
"user_id": "test-123",
"session": {"active": True, "created": "2025-01-01"},
"scores": [85, 90, 78]
}
# Store as JSON
r.set(test_key, json.dumps(test_data))
# Retrieve and parse
result = r.get(test_key)
parsed = json.loads(result)
assert parsed["user_id"] == "test-123"
assert parsed["session"]["active"] is True
assert parsed["scores"] == [85, 90, 78]
# Cleanup
r.delete(test_key)
finally:
r.close()
@pytest.mark.integration
def test_valkey_expiration():
"""Test that Valkey TTL/expiration works."""
import redis
import time
valkey_url = os.environ.get("VALKEY_URL") or os.environ.get("REDIS_URL")
assert valkey_url is not None, "VALKEY_URL or REDIS_URL not set"
r = redis.from_url(valkey_url)
try:
test_key = "_ci_test_expiry"
# Set with 2 second TTL
r.setex(test_key, 2, "temporary_value")
# Should exist immediately
assert r.get(test_key) is not None, "Key should exist"
# Check TTL
ttl = r.ttl(test_key)
assert 0 < ttl <= 2, f"TTL should be 1-2, got {ttl}"
# Wait for expiration
time.sleep(3)
# Should be gone
assert r.get(test_key) is None, "Key should have expired"
finally:
# Cleanup (in case test failed before expiration)
r.delete(test_key)
r.close()