"""Tests for citation_backfill.py — article/paragraph enrichment.""" import hashlib import json from unittest.mock import AsyncMock, MagicMock, patch import pytest from compliance.services.citation_backfill import ( CitationBackfill, MatchResult, _parse_concatenated_source, _parse_json, ) from compliance.services.rag_client import RAGSearchResult # ============================================================================= # Unit tests: _parse_concatenated_source # ============================================================================= class TestParseConcatenatedSource: def test_dsgvo_art(self): result = _parse_concatenated_source("DSGVO Art. 35") assert result == {"name": "DSGVO", "article": "Art. 35"} def test_nis2_artikel(self): result = _parse_concatenated_source("NIS2 Artikel 21 Abs. 2") assert result == {"name": "NIS2", "article": "Artikel 21 Abs. 2"} def test_long_name_with_article(self): result = _parse_concatenated_source("Verordnung (EU) 2024/1689 (KI-Verordnung) Art. 6") assert result == {"name": "Verordnung (EU) 2024/1689 (KI-Verordnung)", "article": "Art. 6"} def test_paragraph_sign(self): result = _parse_concatenated_source("BDSG § 42") assert result == {"name": "BDSG", "article": "§ 42"} def test_paragraph_sign_with_abs(self): result = _parse_concatenated_source("TTDSG § 25 Abs. 1") assert result == {"name": "TTDSG", "article": "§ 25 Abs. 1"} def test_no_article(self): result = _parse_concatenated_source("DSGVO") assert result is None def test_empty_string(self): result = _parse_concatenated_source("") assert result is None def test_none(self): result = _parse_concatenated_source(None) assert result is None def test_just_name_no_article(self): result = _parse_concatenated_source("Cyber Resilience Act") assert result is None # ============================================================================= # Unit tests: _parse_json # ============================================================================= class TestParseJson: def test_direct_json(self): result = _parse_json('{"article": "Art. 35", "paragraph": "Abs. 1"}') assert result == {"article": "Art. 35", "paragraph": "Abs. 1"} def test_markdown_code_block(self): raw = '```json\n{"article": "§ 42", "paragraph": ""}\n```' result = _parse_json(raw) assert result == {"article": "§ 42", "paragraph": ""} def test_text_with_json(self): raw = 'Der Artikel ist {"article": "Art. 6", "paragraph": "Abs. 2"} wie beschrieben.' result = _parse_json(raw) assert result == {"article": "Art. 6", "paragraph": "Abs. 2"} def test_empty(self): assert _parse_json("") is None assert _parse_json(None) is None def test_no_json(self): assert _parse_json("Das ist kein JSON.") is None # ============================================================================= # Integration tests: CitationBackfill matching # ============================================================================= def _make_rag_chunk(text="Test text", article="Art. 35", paragraph="Abs. 1", regulation_code="eu_2016_679", regulation_name="DSGVO"): return RAGSearchResult( text=text, regulation_code=regulation_code, regulation_name=regulation_name, regulation_short="DSGVO", category="datenschutz", article=article, paragraph=paragraph, source_url="https://example.com", score=0.0, collection="bp_compliance_gesetze", ) class TestCitationBackfillMatching: def setup_method(self): self.db = MagicMock() self.rag = MagicMock() self.backfill = CitationBackfill(db=self.db, rag_client=self.rag) @pytest.mark.asyncio async def test_hash_match(self): """Tier 1: exact text hash matches a RAG chunk.""" source_text = "Dies ist ein Gesetzestext mit spezifischen Anforderungen an die Datensicherheit." chunk = _make_rag_chunk(text=source_text, article="Art. 32", paragraph="Abs. 1") h = hashlib.sha256(source_text.encode()).hexdigest() self.backfill._rag_index = {h: chunk} ctrl = { "control_id": "DATA-001", "source_original_text": source_text, "source_citation": {"source": "DSGVO Art. 32"}, "generation_metadata": {"source_regulation": "eu_2016_679"}, } result = await self.backfill._match_control(ctrl) assert result is not None assert result.method == "hash" assert result.article == "Art. 32" assert result.paragraph == "Abs. 1" @pytest.mark.asyncio async def test_regex_match(self): """Tier 2: regex parses concatenated source when no hash match.""" self.backfill._rag_index = {} ctrl = { "control_id": "NET-010", "source_original_text": None, # No original text available "source_citation": {"source": "NIS2 Artikel 21"}, "generation_metadata": {}, } result = await self.backfill._match_control(ctrl) assert result is not None assert result.method == "regex" assert result.article == "Artikel 21" @pytest.mark.asyncio async def test_llm_match(self): """Tier 3: Ollama LLM identifies article/paragraph.""" self.backfill._rag_index = {} ctrl = { "control_id": "AUTH-005", "source_original_text": "Verantwortliche muessen geeignete technische Massnahmen treffen...", "source_citation": {"source": "DSGVO"}, # No article in source "generation_metadata": {"source_regulation": "eu_2016_679"}, } with patch("compliance.services.citation_backfill._llm_ollama", new_callable=AsyncMock) as mock_llm: mock_llm.return_value = '{"article": "Art. 25", "paragraph": "Abs. 1"}' result = await self.backfill._match_control(ctrl) assert result is not None assert result.method == "llm" assert result.article == "Art. 25" assert result.paragraph == "Abs. 1" @pytest.mark.asyncio async def test_no_match(self): """No match when no source text and no parseable source.""" self.backfill._rag_index = {} ctrl = { "control_id": "SEC-001", "source_original_text": None, "source_citation": {"source": "Unknown Source"}, "generation_metadata": {}, } result = await self.backfill._match_control(ctrl) assert result is None def test_update_control_cleans_source(self): """_update_control splits concatenated source and adds article/paragraph.""" ctrl = { "id": "test-uuid-123", "control_id": "DATA-001", "source_citation": {"source": "DSGVO Art. 32", "license": "EU_LAW"}, "generation_metadata": {"processing_path": "structured"}, } match = MatchResult(article="Art. 32", paragraph="Abs. 1", method="hash") self.backfill._update_control(ctrl, match) call_args = self.db.execute.call_args params = call_args[1] if call_args[1] else call_args[0][1] citation = json.loads(params["citation"]) metadata = json.loads(params["metadata"]) assert citation["source"] == "DSGVO" # Cleaned: article removed assert citation["article"] == "Art. 32" assert citation["paragraph"] == "Abs. 1" assert metadata["source_paragraph"] == "Abs. 1" assert metadata["backfill_method"] == "hash" assert "backfill_at" in metadata def test_rule3_not_loaded(self): """Verify the SQL query only loads Rule 1+2 controls.""" # Simulate what _load_controls_needing_backfill does self.db.execute.return_value = MagicMock(keys=lambda: [], __iter__=lambda s: iter([])) self.backfill._load_controls_needing_backfill() sql_text = str(self.db.execute.call_args[0][0].text) assert "license_rule IN (1, 2)" in sql_text assert "source_citation IS NOT NULL" in sql_text # ============================================================================= # Tests: Ollama JSON-Mode # ============================================================================= class TestOllamaJsonMode: """Verify that citation_backfill Ollama payloads include format=json.""" @pytest.mark.asyncio async def test_ollama_payload_contains_format_json(self): """_llm_ollama must send format='json' in the request payload.""" from compliance.services.citation_backfill import _llm_ollama mock_response = MagicMock() mock_response.status_code = 200 mock_response.json.return_value = { "message": {"content": '{"article": "Art. 1"}'} } with patch("compliance.services.citation_backfill.httpx.AsyncClient") as mock_cls: mock_client = AsyncMock() mock_client.post.return_value = mock_response mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_cls.return_value.__aexit__ = AsyncMock(return_value=False) await _llm_ollama("test prompt", "system prompt") mock_client.post.assert_called_once() call_kwargs = mock_client.post.call_args payload = call_kwargs.kwargs.get("json") or call_kwargs[1].get("json") assert payload["format"] == "json"