feat(agents): Test-Harness nutzt volle Compliance-Pipeline für Fetch
CI / detect-changes (push) Successful in 7s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Failing after 4s
CI / validate-canonical-controls (push) Successful in 10s
CI / loc-budget (push) Successful in 12s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / test-python-backend (push) Successful in 28s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped

Statt der simplen dsi-discovery-Wrapper-Funktion ruft der Test-Harness
jetzt _fetch_text() aus agent_check/_fetch.py — die VOLLE Pipeline
die auch der produktive Compliance-Check verwendet:
  - consent-tester dsi-discovery mit 240s Timeout (statt 120s)
  - doc_type-aware max_documents (1 für cookie/dse, 3 für impressum)
  - CMP-Payload-Capture (ePaaS, OneTrust …)
  - HTTP-Fallback mit Browser-User-Agent + DomainRateLimiter
  - HTML-Tag-Strip wenn Playwright fail

Damit funktionieren Cloudflare-/Anti-Bot-geschützte Sites wie BMW
und Elli auch im Test-Harness — vorher Timeout nach 90s.

Plus: bei leerem Fetch klare Fehlermeldung im Slot
('Cloudflare-/Anti-Bot-geschützt — Tipp: Text manuell einfügen')
statt silent-fail. cmp_payloads landen jetzt auch im Vault.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-08 18:38:59 +02:00
parent 702e7a6333
commit 361a5e7605
@@ -16,16 +16,15 @@ from __future__ import annotations
import asyncio
import json
import logging
import os
import uuid
from collections.abc import AsyncGenerator
from typing import Any
import httpx
from fastapi import APIRouter, HTTPException
from fastapi.responses import FileResponse, StreamingResponse
from pydantic import BaseModel, Field
from compliance.api.agent_check._fetch import _fetch_text as full_fetch_text
from compliance.services.specialist_agents import REGISTRY, AgentInput
from compliance.services.specialist_agents._evidence_vault import (
EvidenceVault,
@@ -35,11 +34,6 @@ from compliance.services.specialist_agents._evidence_vault import (
logger = logging.getLogger(__name__)
CONSENT_TESTER_URL = os.environ.get(
"CONSENT_TESTER_URL",
"http://bp-compliance-consent-tester:8094",
)
router = APIRouter(prefix="/specialist-agent", tags=["specialist-agent"])
@@ -214,16 +208,35 @@ async def _process_slot(
req: TestStartRequest,
vault: EvidenceVault,
) -> None:
"""Holt den Text (URL oder raw), ruft Agent, vault-speichert Output."""
"""Holt den Text (URL oder raw), ruft Agent, vault-speichert Output.
Nutzt für den URL-Fetch die VOLLE Compliance-Check-Pipeline
(_fetch_text aus _fetch.py): 240s Playwright-Discovery + HTTP-
Fallback mit Browser-UA + Multi-Page-Merge + CMP-Capture.
"""
label = url or f"text-slot-{slot}"
await _emit(run_id, {"type": "slot_started", "slot": slot,
"label": label})
text = raw_text
fetch_err = ""
cmp_payloads: list[dict] = []
if url and not raw_text:
await _emit(run_id, {"type": "slot_fetching",
"slot": slot, "url": url})
text, fetch_err = await _fetch_text(url)
"slot": slot, "url": url,
"doc_type": agent.doc_type})
try:
text, cmp_payloads = await full_fetch_text(
url, doc_type=agent.doc_type,
)
except Exception as e:
fetch_err = f"{type(e).__name__}: {str(e)[:160]}"
text = ""
if not text and not fetch_err:
fetch_err = (
"Fetch lieferte 0 Zeichen — Site möglicherweise "
"Cloudflare-/Anti-Bot-geschützt oder JS-only-Rendering. "
"Tipp: Text manuell ins raw_text-Feld einfügen."
)
if fetch_err:
await _emit(run_id, {
"type": "slot_fetch_error",
@@ -234,10 +247,14 @@ async def _process_slot(
vault.put_bytes("raw", slot, "source.txt",
text.encode("utf-8"),
mime="text/plain")
if cmp_payloads:
vault.put_json("raw", slot, "cmp_payloads.json", cmp_payloads)
await _emit(run_id, {
"type": "slot_text_ready",
"slot": slot,
"char_count": len(text),
"word_count": len(text.split()) if text else 0,
"cmp_payloads": len(cmp_payloads),
})
agent_input = AgentInput(
doc_type=agent.doc_type,
@@ -246,6 +263,7 @@ async def _process_slot(
business_scope=req.business_scope,
company_name=req.company_name,
origin_domain=req.origin_domain,
context={"cmp_payloads": cmp_payloads} if cmp_payloads else {},
)
await _emit(run_id, {"type": "slot_agent_running", "slot": slot})
try:
@@ -257,6 +275,12 @@ async def _process_slot(
"error": f"{type(e).__name__}: {str(e)[:160]}",
})
return
# Wenn Fetch fail war: füge die Fehlermeldung an die notes des Output
if fetch_err and not text:
output.notes = (
(output.notes + " · " if output.notes else "")
+ f"fetch_error: {fetch_err}"
)
# Persist findings as JSON in vault
vault.put_json("finding", slot, "output.json",
json.loads(output.model_dump_json()))
@@ -299,32 +323,6 @@ async def _process_slot(
})
async def _fetch_text(url: str) -> tuple[str, str]:
"""Nutzt den consent-tester DSI-Discovery für Volltext."""
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/dsi-discovery",
json={"url": url, "max_documents": 5},
timeout=120.0,
)
if resp.status_code != 200:
return "", f"HTTP {resp.status_code}"
data = resp.json()
docs = data.get("documents", []) or []
if not docs:
return "", "no documents discovered"
texts: list[str] = []
for doc in docs:
t = (doc.get("full_text", "") or
doc.get("text_preview", "") or "")
if t and len(t) > 50:
texts.append(t)
return "\n\n".join(texts), ""
except Exception as e:
return "", f"{type(e).__name__}: {str(e)[:160]}"
# ── Run / Vault Queries ──────────────────────────────────────────────