feat: Vorbereitung-Module auf 100% — Persistenz, Backend-Services, UCCA Frontend
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 37s
CI / test-python-backend-compliance (push) Successful in 32s
CI / test-python-document-crawler (push) Successful in 22s
CI / test-python-dsms-gateway (push) Successful in 18s

Phase A: PostgreSQL State Store (sdk_states Tabelle, InMemory-Fallback)
Phase B: Modules dynamisch vom Backend, Scope DB-Persistenz, Source Policy State
Phase C: UCCA Frontend (3 Seiten, Wizard, RiskScoreGauge), Obligations Live-Daten
Phase D: Document Import (PDF/LLM/Gap-Analyse), System Screening (SBOM/OSV.dev)
Phase E: Company Profile CRUD mit Audit-Logging
Phase F: Tests (Python + TypeScript), flow-data.ts DB-Tabellen aktualisiert

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-02 11:04:31 +01:00
parent cd15ab0932
commit e6d666b89b
38 changed files with 4195 additions and 420 deletions

View File

@@ -0,0 +1,134 @@
"""Tests for Company Profile routes (company_profile_routes.py)."""
import json
import pytest
from unittest.mock import MagicMock, patch
from compliance.api.company_profile_routes import (
CompanyProfileRequest,
row_to_response,
log_audit,
)
class TestCompanyProfileRequest:
"""Tests for request model defaults."""
def test_default_values(self):
req = CompanyProfileRequest()
assert req.company_name == ""
assert req.legal_form == "GmbH"
assert req.business_model == "B2B"
assert req.company_size == "small"
assert req.headquarters_country == "DE"
assert req.is_data_controller is True
assert req.is_data_processor is False
assert req.uses_ai is False
assert req.is_complete is False
def test_custom_values(self):
req = CompanyProfileRequest(
company_name="Test GmbH",
industry="Software",
uses_ai=True,
ai_use_cases=["Chatbot", "Analytics"],
offerings=["app_web", "software_saas"],
)
assert req.company_name == "Test GmbH"
assert req.uses_ai is True
assert len(req.ai_use_cases) == 2
assert len(req.offerings) == 2
def test_serialization(self):
req = CompanyProfileRequest(company_name="Test")
data = req.model_dump()
assert data["company_name"] == "Test"
assert isinstance(data["target_markets"], list)
class TestRowToResponse:
"""Tests for DB row to response conversion."""
def _make_row(self, **overrides):
"""Create a mock DB row with 30 fields."""
defaults = [
"uuid-123", # 0: id
"default", # 1: tenant_id
"Test GmbH", # 2: company_name
"GmbH", # 3: legal_form
"IT", # 4: industry
2020, # 5: founded_year
"B2B", # 6: business_model
["app_web"], # 7: offerings
"small", # 8: company_size
"10-49", # 9: employee_count
"2-10 Mio", # 10: annual_revenue
"DE", # 11: headquarters_country
"Berlin", # 12: headquarters_city
False, # 13: has_international_locations
[], # 14: international_countries
["DE", "AT"], # 15: target_markets
"DE", # 16: primary_jurisdiction
True, # 17: is_data_controller
False, # 18: is_data_processor
False, # 19: uses_ai
[], # 20: ai_use_cases
"Max Muster", # 21: dpo_name
"dpo@test.de", # 22: dpo_email
None, # 23: legal_contact_name
None, # 24: legal_contact_email
None, # 25: machine_builder
True, # 26: is_complete
"2026-01-01", # 27: completed_at
"2026-01-01", # 28: created_at
"2026-01-01", # 29: updated_at
]
return tuple(defaults)
def test_basic_conversion(self):
row = self._make_row()
response = row_to_response(row)
assert response.id == "uuid-123"
assert response.tenant_id == "default"
assert response.company_name == "Test GmbH"
assert response.is_complete is True
def test_none_values_handled(self):
row = list(self._make_row())
row[5] = None # founded_year
row[21] = None # dpo_name
row[25] = None # machine_builder
row[27] = None # completed_at
response = row_to_response(tuple(row))
assert response.founded_year is None
assert response.dpo_name is None
assert response.machine_builder is None
assert response.completed_at is None
def test_non_list_jsonb_handled(self):
row = list(self._make_row())
row[7] = None # offerings (JSONB could be None)
row[14] = None # international_countries
response = row_to_response(tuple(row))
assert response.offerings == []
assert response.international_countries == []
class TestLogAudit:
"""Tests for audit logging helper."""
def test_log_audit_success(self):
db = MagicMock()
log_audit(db, "tenant-1", "create", {"company_name": "Test"}, "admin")
db.execute.assert_called_once()
def test_log_audit_with_none_fields(self):
db = MagicMock()
log_audit(db, "tenant-1", "update", None, None)
db.execute.assert_called_once()
def test_log_audit_db_error_handled(self):
db = MagicMock()
db.execute.side_effect = Exception("DB error")
# Should not raise
log_audit(db, "tenant-1", "create", {}, "admin")

View File

@@ -0,0 +1,123 @@
"""Tests for Document Import routes (import_routes.py)."""
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
from compliance.api.import_routes import (
detect_document_type,
analyze_gaps,
extract_text_from_pdf,
)
class TestDetectDocumentType:
"""Tests for keyword-based document type detection."""
def test_dsfa_detection(self):
text = "Dies ist eine Datenschutz-Folgenabschaetzung (DSFA) nach Art. 35 DSGVO"
doc_type, confidence = detect_document_type(text)
assert doc_type == "DSFA"
assert confidence >= 0.5
def test_tom_detection(self):
text = "Technisch-organisatorische Massnahmen (TOM) zum Schutz personenbezogener Daten"
doc_type, confidence = detect_document_type(text)
assert doc_type == "TOM"
assert confidence >= 0.5
def test_vvt_detection(self):
text = "Verarbeitungsverzeichnis nach Art. 30 DSGVO - VVT processing activities"
doc_type, confidence = detect_document_type(text)
assert doc_type == "VVT"
assert confidence >= 0.5
def test_privacy_policy_detection(self):
text = "Datenschutzerklaerung - Privacy Policy fuer unsere Nutzer"
doc_type, confidence = detect_document_type(text)
assert doc_type == "PRIVACY_POLICY"
assert confidence >= 0.5
def test_unknown_document(self):
text = "Lorem ipsum dolor sit amet"
doc_type, confidence = detect_document_type(text)
assert doc_type == "OTHER"
assert confidence == 0.3
def test_empty_text(self):
doc_type, confidence = detect_document_type("")
assert doc_type == "OTHER"
assert confidence == 0.3
def test_confidence_increases_with_more_keywords(self):
text_single = "dsfa"
text_multi = "dsfa dpia datenschutz-folgenabschaetzung privacy impact"
_, conf_single = detect_document_type(text_single)
_, conf_multi = detect_document_type(text_multi)
assert conf_multi > conf_single
def test_confidence_capped_at_095(self):
text = "dsfa dpia datenschutz-folgenabschaetzung privacy impact assessment report analysis"
_, confidence = detect_document_type(text)
assert confidence <= 0.95
class TestAnalyzeGaps:
"""Tests for gap analysis rules."""
def test_ai_gap_detected(self):
text = "Wir setzen KI und AI in unserer Anwendung ein"
gaps = analyze_gaps(text, "OTHER")
# Should detect AI Act gap (missing risk classification)
ai_gaps = [g for g in gaps if g["category"] == "AI Act Compliance"]
assert len(ai_gaps) > 0
assert ai_gaps[0]["severity"] == "CRITICAL"
def test_no_gap_when_requirement_present(self):
text = "KI-System mit Risikoklassifizierung nach EU AI Act"
gaps = analyze_gaps(text, "OTHER")
ai_gaps = [g for g in gaps if g["category"] == "AI Act Compliance"]
assert len(ai_gaps) == 0
def test_tom_gap_detected(self):
text = "Cloud-basiertes SaaS-System mit KI-Funktionen"
gaps = analyze_gaps(text, "OTHER")
tom_gaps = [g for g in gaps if g["category"] == "TOMs"]
assert len(tom_gaps) > 0
def test_no_gaps_for_irrelevant_text(self):
text = "Ein einfacher Flyer ohne Datenbezug"
gaps = analyze_gaps(text, "OTHER")
assert len(gaps) == 0
def test_gap_has_required_fields(self):
text = "KI-System mit automatisierten Entscheidungen"
gaps = analyze_gaps(text, "OTHER")
assert len(gaps) > 0
for gap in gaps:
assert "id" in gap
assert "category" in gap
assert "severity" in gap
assert "regulation" in gap
assert "required_action" in gap
class TestExtractTextFromPdf:
"""Tests for PDF text extraction."""
def test_empty_bytes_returns_empty(self):
result = extract_text_from_pdf(b"")
assert result == ""
def test_invalid_pdf_returns_empty(self):
result = extract_text_from_pdf(b"not a pdf")
assert result == ""
@patch("compliance.api.import_routes.fitz")
def test_fitz_import_error(self, mock_fitz):
"""When fitz is not available, returns empty string."""
mock_fitz.open.side_effect = ImportError("No module")
# The actual function catches ImportError internally
result = extract_text_from_pdf(b"test")
# Since we mocked fitz at module level it will raise differently,
# but the function should handle it gracefully
assert isinstance(result, str)

View File

@@ -0,0 +1,191 @@
"""Tests for System Screening routes (screening_routes.py)."""
import json
import pytest
from unittest.mock import AsyncMock, patch
from compliance.api.screening_routes import (
parse_package_lock,
parse_requirements_txt,
parse_yarn_lock,
detect_and_parse,
generate_sbom,
map_osv_severity,
extract_fix_version,
)
class TestParsePackageLock:
"""Tests for package-lock.json parsing."""
def test_v2_format(self):
data = json.dumps({
"packages": {
"": {"name": "my-app", "version": "1.0.0"},
"node_modules/react": {"version": "18.3.0", "license": "MIT"},
"node_modules/lodash": {"version": "4.17.21", "license": "MIT"},
}
})
components = parse_package_lock(data)
assert len(components) == 2
names = [c["name"] for c in components]
assert "react" in names
assert "lodash" in names
def test_v1_format(self):
data = json.dumps({
"dependencies": {
"express": {"version": "4.18.2"},
"cors": {"version": "2.8.5"},
}
})
components = parse_package_lock(data)
assert len(components) == 2
def test_empty_json(self):
assert parse_package_lock("{}") == []
def test_invalid_json(self):
assert parse_package_lock("not json") == []
def test_root_package_skipped(self):
data = json.dumps({
"packages": {
"": {"name": "root", "version": "1.0.0"},
}
})
components = parse_package_lock(data)
assert len(components) == 0
class TestParseRequirementsTxt:
"""Tests for requirements.txt parsing."""
def test_pinned_versions(self):
content = "fastapi==0.123.9\nuvicorn==0.38.0\npydantic==2.12.5"
components = parse_requirements_txt(content)
assert len(components) == 3
assert components[0]["name"] == "fastapi"
assert components[0]["version"] == "0.123.9"
assert components[0]["ecosystem"] == "PyPI"
def test_minimum_versions(self):
content = "idna>=3.7\ncryptography>=42.0.0"
components = parse_requirements_txt(content)
assert len(components) == 2
assert components[0]["version"] == "3.7"
def test_comments_and_blanks_ignored(self):
content = "# Comment\n\nfastapi==1.0.0\n# Another comment\n-r base.txt"
components = parse_requirements_txt(content)
assert len(components) == 1
def test_bare_package_name(self):
content = "requests"
components = parse_requirements_txt(content)
assert len(components) == 1
assert components[0]["version"] == "latest"
def test_empty_content(self):
assert parse_requirements_txt("") == []
class TestParseYarnLock:
"""Tests for yarn.lock parsing (basic)."""
def test_basic_format(self):
content = '"react@^18.0.0":\n version "18.3.0"\n"lodash@^4.17.0":\n version "4.17.21"'
components = parse_yarn_lock(content)
assert len(components) == 2
class TestDetectAndParse:
"""Tests for file type detection and parsing."""
def test_package_lock_detection(self):
data = json.dumps({"packages": {"node_modules/x": {"version": "1.0"}}})
components, ecosystem = detect_and_parse("package-lock.json", data)
assert ecosystem == "npm"
assert len(components) == 1
def test_requirements_detection(self):
components, ecosystem = detect_and_parse("requirements.txt", "flask==2.0.0")
assert ecosystem == "PyPI"
assert len(components) == 1
def test_unknown_format(self):
components, ecosystem = detect_and_parse("readme.md", "Hello World")
assert len(components) == 0
class TestGenerateSbom:
"""Tests for CycloneDX SBOM generation."""
def test_sbom_structure(self):
components = [
{"name": "react", "version": "18.3.0", "type": "library", "ecosystem": "npm", "license": "MIT"},
]
sbom = generate_sbom(components, "npm")
assert sbom["bomFormat"] == "CycloneDX"
assert sbom["specVersion"] == "1.5"
assert len(sbom["components"]) == 1
assert sbom["components"][0]["purl"] == "pkg:npm/react@18.3.0"
def test_sbom_empty_components(self):
sbom = generate_sbom([], "npm")
assert sbom["components"] == []
def test_sbom_unknown_license_excluded(self):
components = [
{"name": "x", "version": "1.0", "type": "library", "ecosystem": "npm", "license": "unknown"},
]
sbom = generate_sbom(components, "npm")
assert sbom["components"][0]["licenses"] == []
class TestMapOsvSeverity:
"""Tests for OSV severity mapping."""
def test_critical_severity(self):
vuln = {"database_specific": {"severity": "CRITICAL"}}
severity, cvss = map_osv_severity(vuln)
assert severity == "CRITICAL"
assert cvss == 9.5
def test_medium_default(self):
vuln = {}
severity, cvss = map_osv_severity(vuln)
assert severity == "MEDIUM"
assert cvss == 5.0
def test_low_severity(self):
vuln = {"database_specific": {"severity": "LOW"}}
severity, cvss = map_osv_severity(vuln)
assert severity == "LOW"
assert cvss == 2.5
class TestExtractFixVersion:
"""Tests for extracting fix version from OSV data."""
def test_fix_version_found(self):
vuln = {
"affected": [{
"package": {"name": "lodash"},
"ranges": [{"events": [{"introduced": "0"}, {"fixed": "4.17.21"}]}],
}]
}
assert extract_fix_version(vuln, "lodash") == "4.17.21"
def test_no_fix_version(self):
vuln = {"affected": [{"package": {"name": "x"}, "ranges": [{"events": [{"introduced": "0"}]}]}]}
assert extract_fix_version(vuln, "x") is None
def test_wrong_package_name(self):
vuln = {
"affected": [{
"package": {"name": "other"},
"ranges": [{"events": [{"fixed": "1.0"}]}],
}]
}
assert extract_fix_version(vuln, "lodash") is None