feat: Control Library check via SQL (canonical_controls) instead of Qdrant

Complete rewrite of rag_document_checker.py:
- Queries canonical_controls table (294K controls, 10K data_protection)
- Filters by category + title keywords per document type
- Uses test_procedure field as actual check instructions
- Regex pre-check extracts key terms from procedure → fast match
- LLM fallback only for regex misses (saves tokens)
- /no_think prefix for direct JSON output

SQL approach advantages:
- Structured data with test_procedure, pass_criteria, fail_criteria
- Category filtering (data_protection, compliance, governance)
- No Qdrant API key issues
- Controls are actual check criteria, not general legal texts

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-06 20:26:56 +02:00
parent 7e7f31c344
commit fa45b5793c
2 changed files with 185 additions and 200 deletions
@@ -199,17 +199,28 @@ async def _check_single_document(entry: DocCheckEntry) -> list[DocCheckResult]:
# Main document check (full text against primary type)
main_result = _run_checklist(doc_text, entry.doc_type, entry.label, entry.url, word_count)
# RAG-based deep check — DISABLED until Master Controls (G1 Decision Trace) are ready.
# The current 144K controls are general legal texts, not specific check criteria.
# Enable via rag_check=true in request when Master Controls are available.
# try:
# from compliance.services.rag_document_checker import check_document_with_rag
# rag_checks = await check_document_with_rag(doc_text, entry.doc_type, entry.label, entry.url)
# if rag_checks:
# for rc in rag_checks:
# main_result.checks.append(CheckItem(...))
# except Exception as e:
# logger.warning("RAG check failed: %s", e)
# Control Library deep check — verifies against canonical_controls (SQL)
try:
from compliance.services.rag_document_checker import check_document_with_controls
from classroom_engine.database import SessionLocal
db = SessionLocal()
try:
ctrl_checks = await check_document_with_controls(
doc_text, entry.doc_type, entry.label, db,
)
logger.info("Control check: %d results for '%s'", len(ctrl_checks) if ctrl_checks else 0, entry.label)
if ctrl_checks:
for rc in ctrl_checks:
main_result.checks.append(CheckItem(
id=rc["id"], label=rc["label"], passed=rc["passed"],
severity=rc["severity"], matched_text=rc.get("matched_text", ""),
))
if not rc["passed"]:
main_result.findings_count += 1
finally:
db.close()
except Exception as e:
logger.warning("Control check failed for %s: %s %s", entry.label, type(e).__name__, e)
all_results.append(main_result)