feat: LLM-agnostic Compliance Agent with tool calling

New agent architecture for intelligent MC evaluation:

agent_tools.py (367 LOC):
- 5 tools in OpenAI function-calling format
- query_controls: async DB query for MCs by doc_type
- evaluate_controls_batch: deterministic keyword matching
- search_document: text search with context
- get_document_stats: word count, sections, language
- submit_results: finalize check results

compliance_agent.py (398 LOC):
- ComplianceAgent class with agent loop
- 3 LLM providers: Ollama, OpenAI-compatible (OVH), Anthropic
- Tool call dispatch + result collection
- System prompt for systematic compliance analysis
- run_compliance_check() convenience function

Hybrid mode:
- COMPLIANCE_USE_AGENT=false (default): deterministic regex
- COMPLIANCE_USE_AGENT=true: LLM agent with tool calling
- Agent fallback to regex if LLM unavailable

Works with Qwen 35B (Ollama), Qwen 120B (OVH vLLM), Claude.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-10 22:56:09 +02:00
parent bdbc30e47b
commit 58f370f4ff
4 changed files with 778 additions and 2 deletions
@@ -287,8 +287,10 @@ async def _check_single_document(entry: DocCheckEntry) -> list[DocCheckResult]:
# binary pass/fail criteria verified by LLM (Qwen)
try:
from compliance.services.rag_document_checker import check_document_with_controls
use_agent = os.getenv("COMPLIANCE_USE_AGENT", "false").lower() == "true"
mc_results = await check_document_with_controls(
doc_text, entry.doc_type, entry.label, max_controls=0,
doc_text, entry.doc_type, entry.label,
max_controls=0, use_agent=use_agent,
)
if mc_results:
# Add MC results as additional checks to the main result
@@ -0,0 +1,367 @@
"""
Agent Tools — LLM-agnostic tool definitions for the Compliance Agent.
Provides 5 tools in OpenAI function-calling format:
1. query_controls — load Master Controls from DB
2. evaluate_controls_batch — deterministic keyword check against doc text
3. search_document — find keyword in document with context
4. get_document_stats — word count, sections, language
5. submit_results — final submission of pass/fail results
All implementations are deterministic (no LLM). The agent decides
which tools to call and in what order.
"""
import asyncpg
import json
import logging
import os
import re
logger = logging.getLogger(__name__)
DATABASE_URL = os.getenv(
"DATABASE_URL",
"postgresql://breakpilot:breakpilot@bp-core-postgres:5432/breakpilot",
)
# ═══════════════════════════════════════════════════════════════
# Tool definitions (OpenAI function-calling format)
# ═══════════════════════════════════════════════════════════════
TOOLS = [
{
"type": "function",
"function": {
"name": "query_controls",
"description": (
"Lade Master Controls aus der Datenbank fuer einen Dokumenttyp. "
"Gibt eine Liste von Pruefpunkten mit check_question, "
"pass_criteria und fail_criteria zurueck."
),
"parameters": {
"type": "object",
"properties": {
"doc_type": {
"type": "string",
"description": "Dokumenttyp: dse, agb, impressum, cookie, widerruf, avv, dsfa",
},
"severity": {
"type": "string",
"description": "Optional: nur Controls mit dieser Severity (HIGH, MEDIUM, LOW)",
},
"limit": {
"type": "integer",
"description": "Max Anzahl Controls (default: alle)",
},
},
"required": ["doc_type"],
},
},
},
{
"type": "function",
"function": {
"name": "evaluate_controls_batch",
"description": (
"Pruefe mehrere Master Controls gegen den Dokumenttext. "
"Deterministisch: Keyword-Matching der pass_criteria. "
"Gibt fuer jeden Control pass/fail mit Evidence zurueck."
),
"parameters": {
"type": "object",
"properties": {
"controls": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {"type": "string"},
"check_question": {"type": "string"},
"pass_criteria": {
"type": "array",
"items": {"type": "string"},
},
},
"required": ["id", "check_question", "pass_criteria"],
},
"description": "Liste von Controls zum Pruefen (max 20 pro Batch)",
},
},
"required": ["controls"],
},
},
},
{
"type": "function",
"function": {
"name": "search_document",
"description": (
"Suche ein Schluesselwort im Dokument und gib die "
"Fundstelle mit Kontext zurueck."
),
"parameters": {
"type": "object",
"properties": {
"keyword": {
"type": "string",
"description": "Suchbegriff (case-insensitive)",
},
"context_chars": {
"type": "integer",
"description": "Zeichen Kontext um die Fundstelle (default: 200)",
},
},
"required": ["keyword"],
},
},
},
{
"type": "function",
"function": {
"name": "get_document_stats",
"description": (
"Statistiken zum Dokument: Wortanzahl, erkannte Abschnitte, "
"Sprache, Laenge."
),
"parameters": {
"type": "object",
"properties": {},
},
},
},
{
"type": "function",
"function": {
"name": "submit_results",
"description": (
"Reiche die finalen Pruefergebnisse ein. Jedes Ergebnis "
"hat id, label, passed, severity und eine optionale Empfehlung."
),
"parameters": {
"type": "object",
"properties": {
"results": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {"type": "string"},
"label": {"type": "string"},
"passed": {"type": "boolean"},
"severity": {"type": "string"},
"hint": {"type": "string"},
"matched_text": {"type": "string"},
},
"required": ["id", "label", "passed", "severity"],
},
},
},
"required": ["results"],
},
},
},
]
# ═══════════════════════════════════════════════════════════════
# Tool dispatcher
# ═══════════════════════════════════════════════════════════════
async def execute_tool(name: str, args: dict, context: dict) -> dict:
"""Dispatch a tool call to its implementation.
Args:
name: Tool function name.
args: Parsed arguments dict.
context: Shared state with doc_text, doc_type, db_url, results.
"""
dispatch = {
"query_controls": _query_controls,
"evaluate_controls_batch": _evaluate_controls_batch,
"search_document": _search_document,
"get_document_stats": _get_document_stats,
"submit_results": _submit_results,
}
fn = dispatch.get(name)
if not fn:
return {"error": f"Unbekanntes Tool: {name}"}
try:
return await fn(args, context)
except Exception as e:
logger.exception("Tool %s failed", name)
return {"error": str(e)}
# ═══════════════════════════════════════════════════════════════
# Tool implementations
# ═══════════════════════════════════════════════════════════════
async def _query_controls(args: dict, ctx: dict) -> dict:
"""Load Master Controls from compliance.doc_check_controls."""
doc_type = args.get("doc_type", ctx.get("doc_type", "dse"))
severity = args.get("severity")
limit = args.get("limit", 0)
db_url = ctx.get("db_url") or DATABASE_URL
try:
conn = await asyncpg.connect(db_url)
except Exception as e:
return {"error": f"DB-Verbindung fehlgeschlagen: {e}", "controls": []}
try:
query = (
"SELECT id, control_id, title, regulation, check_question, "
" pass_criteria, fail_criteria, severity "
"FROM compliance.doc_check_controls "
"WHERE doc_type = $1"
)
params: list = [doc_type]
if severity:
query += " AND UPPER(severity) = $2"
params.append(severity.upper())
query += " ORDER BY severity DESC, title"
if limit and limit > 0:
query += f" LIMIT {int(limit)}"
rows = await conn.fetch(query, *params)
controls = []
for r in rows:
pc = r["pass_criteria"]
if isinstance(pc, str):
try:
pc = json.loads(pc)
except Exception:
pc = [pc] if pc else []
controls.append({
"id": str(r["id"]),
"control_id": r.get("control_id", ""),
"title": r.get("title", ""),
"regulation": r.get("regulation", ""),
"check_question": r.get("check_question", ""),
"pass_criteria": pc if isinstance(pc, list) else [pc],
"severity": r.get("severity", "MEDIUM"),
})
return {"count": len(controls), "controls": controls}
except Exception as e:
return {"error": f"Abfrage fehlgeschlagen: {e}", "controls": []}
finally:
await conn.close()
_STOP = {
"oder", "und", "der", "die", "das", "ein", "eine", "von", "vom",
"zur", "zum", "mit", "auf", "aus", "bei", "nach", "nicht", "kein",
"wird", "werden", "kann", "muss", "ist", "sind", "hat", "dass",
}
async def _evaluate_controls_batch(args: dict, ctx: dict) -> dict:
"""Keyword-match each control's pass_criteria against doc_text."""
controls = args.get("controls", [])[:20]
text_lower = ctx.get("doc_text", "").lower().replace("\xad", "")
results = []
for ctrl in controls:
criteria = ctrl.get("pass_criteria", [])
met = 0
evidence = ""
for crit in criteria:
words = [w for w in re.findall(r"[a-z\u00e4\u00f6\u00fc\u00df]{4,}", crit.lower()) if w not in _STOP]
if not words:
met += 1
continue
matched = sum(1 for w in words if w in text_lower)
if matched >= len(words) * 0.5:
met += 1
if not evidence:
for w in words:
idx = text_lower.find(w)
if idx >= 0:
s = max(0, idx - 30)
e = min(len(text_lower), idx + len(w) + 30)
evidence = text_lower[s:e].strip()
break
passed = met >= len(criteria) * 0.6 if criteria else False
results.append({
"id": ctrl.get("id", ""),
"passed": passed,
"criteria_met": f"{met}/{len(criteria)}",
"matched_text": evidence[:120],
})
return {"evaluated": len(results), "results": results}
async def _search_document(args: dict, ctx: dict) -> dict:
"""Simple keyword search with context window."""
keyword = args.get("keyword", "").lower()
context_chars = args.get("context_chars", 200)
text = ctx.get("doc_text", "")
text_lower = text.lower()
hits = []
start = 0
while len(hits) < 5:
idx = text_lower.find(keyword, start)
if idx < 0:
break
s = max(0, idx - context_chars)
e = min(len(text), idx + len(keyword) + context_chars)
hits.append({"position": idx, "context": text[s:e].strip()})
start = idx + len(keyword)
return {"keyword": keyword, "found": len(hits) > 0, "hits": hits}
async def _get_document_stats(args: dict, ctx: dict) -> dict:
"""Word count, detected sections, language guess."""
text = ctx.get("doc_text", "")
words = text.split()
# Detect sections (lines that look like headings)
sections = []
for line in text.split("\n"):
stripped = line.strip()
if 3 < len(stripped) < 120 and stripped[0].isupper() and not stripped.endswith(","):
if re.match(r"^(\d+\.?\s+|[IVXLC]+\.?\s+|[a-z]\)\s+)?[A-ZAEOEUE\u00c4\u00d6\u00dc]", stripped):
sections.append(stripped[:80])
# Language guess (DE vs EN heuristic)
de_markers = sum(1 for w in ["der", "die", "das", "und", "ist", "werden", "nicht"] if w in text.lower())
en_markers = sum(1 for w in ["the", "and", "is", "are", "not", "with", "for"] if w in text.lower())
lang = "de" if de_markers > en_markers else "en"
return {
"word_count": len(words),
"char_count": len(text),
"sections_detected": len(sections),
"sections": sections[:20],
"language": lang,
}
async def _submit_results(args: dict, ctx: dict) -> dict:
"""Store final results in context for the caller to retrieve."""
results = args.get("results", [])
# Normalise into CheckItem-compatible format
normalised = []
for r in results:
normalised.append({
"id": r.get("id", ""),
"label": r.get("label", ""),
"passed": r.get("passed", False),
"severity": r.get("severity", "MEDIUM"),
"hint": r.get("hint", ""),
"matched_text": r.get("matched_text", ""),
"level": 2,
"parent": None,
"skipped": False,
"source": "agent",
})
ctx["results"] = normalised
passed = sum(1 for r in normalised if r["passed"])
return {
"submitted": len(normalised),
"passed": passed,
"failed": len(normalised) - passed,
}
@@ -0,0 +1,398 @@
"""
Compliance Agent — LLM-agnostic agent with tool calling.
Supports three LLM providers:
- ollama: Local Ollama instance (default)
- openai: OpenAI-compatible API (OVH vLLM, etc.)
- anthropic: Anthropic Messages API
The agent loop:
1. Send messages + tool definitions to the LLM
2. If the LLM returns tool_calls → execute them → append results
3. Repeat until no more tool_calls or max_iterations reached
4. Return the collected results from submit_results()
All tool execution is deterministic (keyword matching, DB queries).
The LLM only decides WHICH tools to call and HOW to interpret results.
"""
import httpx
import json
import logging
import os
import re
from compliance.services.agent_tools import TOOLS, execute_tool
logger = logging.getLogger(__name__)
# ═══════════════════════════════════════════════════════════════
# Provider configuration
# ═══════════════════════════════════════════════════════════════
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
OVH_URL = os.getenv("OVH_LLM_URL", "")
ANTHROPIC_KEY = os.getenv("ANTHROPIC_API_KEY", "")
AGENT_PROVIDER = os.getenv("COMPLIANCE_AGENT_PROVIDER", "ollama")
AGENT_MODEL = os.getenv("COMPLIANCE_AGENT_MODEL", "qwen3.5:35b-a3b")
SYSTEM_PROMPT = (
"Du bist ein Datenschutz-Compliance-Analyst. "
"Pruefe das Dokument systematisch:\n"
"1. Lade alle Master Controls fuer den Dokumenttyp (query_controls)\n"
"2. Pruefe die Controls in Batches von 20 (evaluate_controls_batch)\n"
"3. Bei unklaren Ergebnissen: suche im Dokument nach "
"Schluesselwoertern (search_document)\n"
"4. Reiche die finalen Ergebnisse ein (submit_results) "
"mit konkreten Empfehlungen\n"
"Arbeite gruendlich und pruefe ALLE Controls. "
"Antworte immer auf Deutsch."
)
# ═══════════════════════════════════════════════════════════════
# Agent class
# ═══════════════════════════════════════════════════════════════
class ComplianceAgent:
"""LLM-agnostic compliance agent with tool calling loop."""
def __init__(
self,
provider_type: str,
model: str,
doc_text: str,
doc_type: str,
doc_title: str,
db_url: str = "",
):
self.provider = provider_type or AGENT_PROVIDER
self.model = model or AGENT_MODEL
self.doc_text = doc_text
self.doc_type = doc_type
self.doc_title = doc_title
self.context = {
"doc_text": doc_text,
"doc_type": doc_type,
"db_url": db_url,
"results": [],
}
async def run(self, max_iterations: int = 15) -> list[dict]:
"""Run the agent loop until completion or max iterations."""
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{
"role": "user",
"content": (
f"Pruefe das folgende Dokument vom Typ '{self.doc_type}' "
f"(Titel: '{self.doc_title}').\n\n"
f"Dokumenttext ({len(self.doc_text)} Zeichen):\n"
f"{self.doc_text[:6000]}"
),
},
]
tools = TOOLS
iteration = 0
while iteration < max_iterations:
iteration += 1
logger.info(
"Agent iteration %d/%d (provider=%s, model=%s)",
iteration, max_iterations, self.provider, self.model,
)
response = await self._chat_with_tools(messages, tools)
content = response.get("content", "")
tool_calls = response.get("tool_calls", [])
if content:
messages.append({"role": "assistant", "content": content})
if not tool_calls:
logger.info("Agent finished after %d iterations", iteration)
break
# Execute each tool call and append results
for tc in tool_calls:
tool_name = tc.get("name", "")
tool_args = tc.get("arguments", {})
if isinstance(tool_args, str):
try:
tool_args = json.loads(tool_args)
except json.JSONDecodeError:
tool_args = {}
logger.info("Tool call: %s(%s)", tool_name, list(tool_args.keys()))
result = await self._execute_tool(tc)
result_str = json.dumps(result, ensure_ascii=False, default=str)
# Append tool call + result as messages
messages.append({
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": tc.get("id", f"call_{iteration}_{tool_name}"),
"type": "function",
"function": {"name": tool_name, "arguments": json.dumps(tool_args)},
}
],
})
messages.append({
"role": "tool",
"tool_call_id": tc.get("id", f"call_{iteration}_{tool_name}"),
"content": result_str[:8000],
})
return self.context.get("results", [])
async def _chat_with_tools(self, messages: list, tools: list) -> dict:
"""Send messages+tools to the LLM. Returns {content, tool_calls}."""
if self.provider == "ollama":
return await self._chat_ollama(messages, tools)
elif self.provider == "openai":
return await self._chat_openai(messages, tools)
elif self.provider == "anthropic":
return await self._chat_anthropic(messages, tools)
else:
raise ValueError(f"Unbekannter Provider: {self.provider}")
# ───────────────────────────────────────────────────────────
# Ollama
# ───────────────────────────────────────────────────────────
async def _chat_ollama(self, messages: list, tools: list) -> dict:
"""Ollama /api/chat with tools support."""
payload = {
"model": self.model,
"messages": messages,
"tools": tools,
"stream": False,
"options": {"temperature": 0.1, "num_predict": 4000},
}
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(f"{OLLAMA_URL}/api/chat", json=payload)
resp.raise_for_status()
data = resp.json()
msg = data.get("message", {})
content = msg.get("content", "")
# Strip think tags
content = re.sub(r"<think>.*?</think>", "", content, flags=re.DOTALL).strip()
raw_calls = msg.get("tool_calls", [])
tool_calls = []
for tc in raw_calls:
fn = tc.get("function", {})
tool_calls.append({
"id": f"ollama_{fn.get('name', '')}",
"name": fn.get("name", ""),
"arguments": fn.get("arguments", {}),
})
return {"content": content, "tool_calls": tool_calls}
# ───────────────────────────────────────────────────────────
# OpenAI-compatible (OVH vLLM, etc.)
# ───────────────────────────────────────────────────────────
async def _chat_openai(self, messages: list, tools: list) -> dict:
"""OpenAI-compatible /v1/chat/completions with tools."""
base_url = OVH_URL or os.getenv("OPENAI_BASE_URL", "")
api_key = os.getenv("OVH_LLM_KEY", os.getenv("OPENAI_API_KEY", ""))
headers = {"Content-Type": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
payload = {
"model": self.model,
"messages": messages,
"tools": tools,
"temperature": 0.1,
"max_tokens": 4000,
}
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{base_url.rstrip('/')}/v1/chat/completions",
json=payload,
headers=headers,
)
resp.raise_for_status()
data = resp.json()
choice = data.get("choices", [{}])[0]
msg = choice.get("message", {})
tool_calls = []
for tc in msg.get("tool_calls", []):
fn = tc.get("function", {})
tool_calls.append({
"id": tc.get("id", ""),
"name": fn.get("name", ""),
"arguments": fn.get("arguments", "{}"),
})
return {"content": msg.get("content", "") or "", "tool_calls": tool_calls}
# ───────────────────────────────────────────────────────────
# Anthropic
# ───────────────────────────────────────────────────────────
async def _chat_anthropic(self, messages: list, tools: list) -> dict:
"""Anthropic Messages API with tools (different format)."""
api_key = ANTHROPIC_KEY or os.getenv("ANTHROPIC_API_KEY", "")
# Convert tools to Anthropic format
anthropic_tools = []
for t in tools:
fn = t.get("function", {})
anthropic_tools.append({
"name": fn["name"],
"description": fn.get("description", ""),
"input_schema": fn.get("parameters", {"type": "object", "properties": {}}),
})
# Extract system from messages, convert rest
system_text = ""
api_messages = []
for m in messages:
if m["role"] == "system":
system_text = m["content"]
elif m["role"] == "tool":
api_messages.append({
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": m.get("tool_call_id", ""),
"content": m.get("content", ""),
}
],
})
elif m["role"] == "assistant" and m.get("tool_calls"):
blocks = []
if m.get("content"):
blocks.append({"type": "text", "text": m["content"]})
for tc in m["tool_calls"]:
fn = tc.get("function", tc)
args = fn.get("arguments", {})
if isinstance(args, str):
try:
args = json.loads(args)
except Exception:
args = {}
blocks.append({
"type": "tool_use",
"id": tc.get("id", ""),
"name": fn.get("name", tc.get("name", "")),
"input": args,
})
api_messages.append({"role": "assistant", "content": blocks})
else:
api_messages.append({"role": m["role"], "content": m.get("content", "")})
payload = {
"model": self.model,
"max_tokens": 4000,
"system": system_text,
"messages": api_messages,
"tools": anthropic_tools,
"temperature": 0.1,
}
headers = {
"x-api-key": api_key,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
}
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
"https://api.anthropic.com/v1/messages",
json=payload,
headers=headers,
)
resp.raise_for_status()
data = resp.json()
content_parts = data.get("content", [])
text = ""
tool_calls = []
for part in content_parts:
if part["type"] == "text":
text += part.get("text", "")
elif part["type"] == "tool_use":
tool_calls.append({
"id": part.get("id", ""),
"name": part.get("name", ""),
"arguments": part.get("input", {}),
})
return {"content": text, "tool_calls": tool_calls}
# ───────────────────────────────────────────────────────────
# Tool execution
# ───────────────────────────────────────────────────────────
async def _execute_tool(self, tool_call: dict) -> dict:
"""Execute a single tool call via agent_tools.execute_tool()."""
name = tool_call.get("name", "")
args = tool_call.get("arguments", {})
if isinstance(args, str):
try:
args = json.loads(args)
except json.JSONDecodeError:
args = {}
return await execute_tool(name, args, self.context)
# ═══════════════════════════════════════════════════════════════
# Convenience function
# ═══════════════════════════════════════════════════════════════
async def run_compliance_check(
text: str,
doc_type: str,
doc_title: str,
provider_type: str = "",
model: str = "",
db_url: str = "",
) -> list[dict]:
"""Run a full compliance check with the agent.
Args:
text: Document text to check.
doc_type: Document type (dse, agb, impressum, etc.).
doc_title: Human-readable document title.
provider_type: LLM provider (ollama, openai, anthropic).
model: Model name/ID (provider-specific).
db_url: PostgreSQL connection string (optional).
Returns:
List of CheckItem-compatible dicts with pass/fail results.
"""
provider = provider_type or AGENT_PROVIDER
mdl = model or AGENT_MODEL
logger.info(
"Starting compliance check: doc_type=%s, title='%s', "
"provider=%s, model=%s, text_len=%d",
doc_type, doc_title, provider, mdl, len(text),
)
agent = ComplianceAgent(
provider_type=provider,
model=mdl,
doc_text=text,
doc_type=doc_type,
doc_title=doc_title,
db_url=db_url,
)
results = await agent.run(max_iterations=15)
logger.info(
"Compliance check complete: %d results (%d passed, %d failed)",
len(results),
sum(1 for r in results if r.get("passed")),
sum(1 for r in results if not r.get("passed")),
)
return results
@@ -36,11 +36,20 @@ async def check_document_with_controls(
doc_title: str,
db_url: str = "",
max_controls: int = 0, # 0 = no limit, check ALL
use_agent: bool = False, # Use LLM agent for intelligent evaluation
) -> list[dict]:
"""Check document against ALL doc_check_controls for this doc_type.
Deterministic: same text + same MCs = same result. No LLM involved.
Two modes:
- use_agent=False (default): Deterministic keyword matching. Fast, reproducible.
- use_agent=True: LLM agent with tool calling. Intelligent, contextual.
"""
if use_agent:
try:
from compliance.services.compliance_agent import run_compliance_check
return await run_compliance_check(text, doc_type, doc_title)
except Exception as e:
logger.warning("Agent mode failed, falling back to regex: %s", e)
if not text or len(text) < 100:
return []