feat(control-library): document-grouped batching, generation strategy tracking, sort by source
All checks were successful
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 31s
CI/CD / test-python-backend-compliance (push) Successful in 31s
CI/CD / test-python-document-crawler (push) Successful in 21s
CI/CD / test-python-dsms-gateway (push) Successful in 18s
CI/CD / validate-canonical-controls (push) Successful in 11s
CI/CD / Deploy (push) Successful in 2s

- Group chunks by regulation_code before batching for better LLM context
- Add generation_strategy column (ungrouped=v1, document_grouped=v2)
- Add v1/v2 badge to control cards in frontend
- Add sort-by-source option with visual group headers
- Add frontend page tests (18 tests)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-15 15:10:52 +01:00
parent 0d95c3bb44
commit c8fd9cc780
9 changed files with 1000 additions and 137 deletions

View File

@@ -1,17 +1,36 @@
"""Tests for Canonical Control Library routes (canonical_control_routes.py)."""
"""Tests for Canonical Control Library routes (canonical_control_routes.py).
Includes:
- Model validation tests (FrameworkResponse, ControlResponse, etc.)
- _control_row conversion tests
- Server-side pagination, sorting, search, source filter tests
- /controls-count and /controls-meta endpoint tests
"""
import pytest
from unittest.mock import MagicMock, patch
from datetime import datetime, timezone
from fastapi import FastAPI
from fastapi.testclient import TestClient
from compliance.api.canonical_control_routes import (
FrameworkResponse,
ControlResponse,
SimilarityCheckRequest,
SimilarityCheckResponse,
_control_row,
router,
)
# ---------------------------------------------------------------------------
# TestClient setup for endpoint tests
# ---------------------------------------------------------------------------
_app = FastAPI()
_app.include_router(router, prefix="/api/compliance")
_client = TestClient(_app)
class TestFrameworkResponse:
"""Tests for FrameworkResponse model."""
@@ -175,6 +194,7 @@ class TestControlRowConversion:
],
"release_state": "draft",
"tags": ["mfa"],
"generation_strategy": "ungrouped",
"created_at": now,
"updated_at": now,
}
@@ -223,3 +243,213 @@ class TestControlRowConversion:
result = _control_row(row)
assert result["created_at"] is None
assert result["updated_at"] is None
def test_generation_strategy_default(self):
row = self._make_row()
result = _control_row(row)
assert result["generation_strategy"] == "ungrouped"
def test_generation_strategy_document_grouped(self):
row = self._make_row(generation_strategy="document_grouped")
result = _control_row(row)
assert result["generation_strategy"] == "document_grouped"
# =============================================================================
# ENDPOINT TESTS — Server-Side Pagination, Sort, Search, Source Filter
# =============================================================================
def _make_mock_row(**overrides):
"""Build a mock Row with all canonical_controls columns."""
now = datetime.now(timezone.utc)
defaults = {
"id": "uuid-ctrl-1",
"framework_id": "uuid-fw-1",
"control_id": "AUTH-001",
"title": "Test Control",
"objective": "Test obj",
"rationale": "Test rat",
"scope": {},
"requirements": ["Req 1"],
"test_procedure": ["Test 1"],
"evidence": [],
"severity": "high",
"risk_score": 3.0,
"implementation_effort": "m",
"evidence_confidence": None,
"open_anchors": [],
"release_state": "draft",
"tags": [],
"license_rule": 1,
"source_original_text": None,
"source_citation": None,
"customer_visible": True,
"verification_method": "automated",
"category": "authentication",
"target_audience": "developer",
"generation_metadata": {},
"generation_strategy": "ungrouped",
"created_at": now,
"updated_at": now,
}
defaults.update(overrides)
mock = MagicMock()
for k, v in defaults.items():
setattr(mock, k, v)
return mock
def _session_returning(rows=None, scalar=None):
"""Create a mock SessionLocal that returns rows or scalar."""
db = MagicMock()
result = MagicMock()
if rows is not None:
result.fetchall.return_value = rows
if scalar is not None:
result.scalar.return_value = scalar
db.execute.return_value = result
db.__enter__ = MagicMock(return_value=db)
db.__exit__ = MagicMock(return_value=False)
return db
class TestListControlsPagination:
"""GET /controls with limit/offset."""
@patch("compliance.api.canonical_control_routes.SessionLocal")
def test_limit_param_in_sql(self, mock_cls):
mock_cls.return_value = _session_returning(rows=[_make_mock_row()])
resp = _client.get("/api/compliance/v1/canonical/controls?limit=10&offset=20")
assert resp.status_code == 200
sql = str(mock_cls.return_value.__enter__().execute.call_args[0][0].text)
assert "LIMIT" in sql
assert "OFFSET" in sql
@patch("compliance.api.canonical_control_routes.SessionLocal")
def test_no_limit_by_default(self, mock_cls):
mock_cls.return_value = _session_returning(rows=[])
resp = _client.get("/api/compliance/v1/canonical/controls")
assert resp.status_code == 200
sql = str(mock_cls.return_value.__enter__().execute.call_args[0][0].text)
assert "LIMIT" not in sql
class TestListControlsSorting:
"""GET /controls with sort/order."""
@patch("compliance.api.canonical_control_routes.SessionLocal")
def test_sort_created_at_desc(self, mock_cls):
mock_cls.return_value = _session_returning(rows=[])
resp = _client.get("/api/compliance/v1/canonical/controls?sort=created_at&order=desc")
assert resp.status_code == 200
sql = str(mock_cls.return_value.__enter__().execute.call_args[0][0].text)
assert "created_at DESC" in sql
@patch("compliance.api.canonical_control_routes.SessionLocal")
def test_default_sort_control_id_asc(self, mock_cls):
mock_cls.return_value = _session_returning(rows=[])
resp = _client.get("/api/compliance/v1/canonical/controls")
assert resp.status_code == 200
sql = str(mock_cls.return_value.__enter__().execute.call_args[0][0].text)
assert "control_id ASC" in sql
@patch("compliance.api.canonical_control_routes.SessionLocal")
def test_sql_injection_in_sort_blocked(self, mock_cls):
mock_cls.return_value = _session_returning(rows=[])
resp = _client.get("/api/compliance/v1/canonical/controls?sort=1;DROP+TABLE")
assert resp.status_code == 200
sql = str(mock_cls.return_value.__enter__().execute.call_args[0][0].text)
assert "DROP" not in sql
assert "control_id" in sql
@patch("compliance.api.canonical_control_routes.SessionLocal")
def test_sort_by_source(self, mock_cls):
mock_cls.return_value = _session_returning(rows=[])
resp = _client.get("/api/compliance/v1/canonical/controls?sort=source&order=asc")
assert resp.status_code == 200
sql = str(mock_cls.return_value.__enter__().execute.call_args[0][0].text)
assert "source_citation" in sql
assert "control_id ASC" in sql # secondary sort within source group
class TestListControlsSearch:
"""GET /controls with search."""
@patch("compliance.api.canonical_control_routes.SessionLocal")
def test_search_uses_ilike(self, mock_cls):
mock_cls.return_value = _session_returning(rows=[])
resp = _client.get("/api/compliance/v1/canonical/controls?search=encryption")
assert resp.status_code == 200
sql = str(mock_cls.return_value.__enter__().execute.call_args[0][0].text)
assert "ILIKE" in sql
params = mock_cls.return_value.__enter__().execute.call_args[0][1]
assert params["q"] == "%encryption%"
class TestListControlsSourceFilter:
"""GET /controls with source filter."""
@patch("compliance.api.canonical_control_routes.SessionLocal")
def test_specific_source(self, mock_cls):
mock_cls.return_value = _session_returning(rows=[])
resp = _client.get("/api/compliance/v1/canonical/controls?source=DSGVO")
assert resp.status_code == 200
sql = str(mock_cls.return_value.__enter__().execute.call_args[0][0].text)
assert "source_citation" in sql
params = mock_cls.return_value.__enter__().execute.call_args[0][1]
assert params["src"] == "DSGVO"
@patch("compliance.api.canonical_control_routes.SessionLocal")
def test_no_source_filter(self, mock_cls):
mock_cls.return_value = _session_returning(rows=[])
resp = _client.get("/api/compliance/v1/canonical/controls?source=__none__")
assert resp.status_code == 200
sql = str(mock_cls.return_value.__enter__().execute.call_args[0][0].text)
assert "IS NULL" in sql
class TestControlsCount:
"""GET /controls-count."""
@patch("compliance.api.canonical_control_routes.SessionLocal")
def test_returns_total(self, mock_cls):
mock_cls.return_value = _session_returning(scalar=42)
resp = _client.get("/api/compliance/v1/canonical/controls-count")
assert resp.status_code == 200
assert resp.json() == {"total": 42}
@patch("compliance.api.canonical_control_routes.SessionLocal")
def test_with_filters(self, mock_cls):
mock_cls.return_value = _session_returning(scalar=5)
resp = _client.get("/api/compliance/v1/canonical/controls-count?severity=critical&search=mfa")
assert resp.status_code == 200
assert resp.json() == {"total": 5}
sql = str(mock_cls.return_value.__enter__().execute.call_args[0][0].text)
assert "severity" in sql
assert "ILIKE" in sql
class TestControlsMeta:
"""GET /controls-meta."""
@patch("compliance.api.canonical_control_routes.SessionLocal")
def test_returns_structure(self, mock_cls):
db = MagicMock()
db.__enter__ = MagicMock(return_value=db)
db.__exit__ = MagicMock(return_value=False)
# 4 sequential execute() calls
total_r = MagicMock(); total_r.scalar.return_value = 100
domain_r = MagicMock(); domain_r.fetchall.return_value = []
source_r = MagicMock(); source_r.fetchall.return_value = []
nosrc_r = MagicMock(); nosrc_r.scalar.return_value = 20
db.execute.side_effect = [total_r, domain_r, source_r, nosrc_r]
mock_cls.return_value = db
resp = _client.get("/api/compliance/v1/canonical/controls-meta")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 100
assert data["no_source_count"] == 20
assert isinstance(data["domains"], list)
assert isinstance(data["sources"], list)