fix(impressum): Findings aus 12 §5-TMG-Pattern-MCs statt verunreinigtem DB-Set
CI / detect-changes (push) Successful in 8s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Failing after 5s
CI / validate-canonical-controls (push) Successful in 11s
CI / loc-budget (push) Successful in 14s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 30s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
CI / detect-changes (push) Successful in 8s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Failing after 5s
CI / validate-canonical-controls (push) Successful in 11s
CI / loc-budget (push) Successful in 14s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 30s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
Der Agent lieferte "alles gruen": _load_controls gab auf macmini nur 3 von 75 doc_type='impressum'-MCs zurueck (Sidecar mc_classification.db hat nur 4/75 als text-matchbar klassifiziert). Tiefere Ursache: die 75 doc_type='impressum'-MCs sind fehl-klassifiziert (60/75 canonical_scope='other'; Prefixes TRD/SEC/GOV = Geschaeftsbriefe/Marktplatz/Bestellung, NICHT §5 TMG Website-Impressum). Fix: Der Impressum-Agent erzeugt Findings jetzt aus seinen 12 autoritativen §5-TMG/DDG-Pattern-MCs (mcs.py) statt aus dem verunreinigten DB-Set — deterministisch, scope-aware, field_id = semantisches Feld. Semantic-Validator- Demote + Massnahmen + Rollup bleiben. Die 5-Impressum-GT-Tests laufen jetzt echt durch: 0 Falsch-Positive. DB-Master-Controls fuer Impressum deaktiviert bis zum MC-Re-Filtering (separate Aufgabe: die doc_type-Klassifizierung der Vorgaenger-Session muss bereinigt werden). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -39,8 +39,7 @@ from .._base import (
|
|||||||
from .._pattern_library import record as record_pattern
|
from .._pattern_library import record as record_pattern
|
||||||
from .._rollup import rollup
|
from .._rollup import rollup
|
||||||
from .._semantic_validator import build_rename_action, validate_present
|
from .._semantic_validator import build_rename_action, validate_present
|
||||||
from .mcs import MC_IDS, MCS, detect_automotive
|
from .mcs import MC_IDS, MCS, detect_automotive, scope_matches
|
||||||
from .v3_engine import run_v3_pipeline
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -107,73 +106,53 @@ class ImpressumAgent(BaseSpecialistAgent):
|
|||||||
notes="Impressum-Text zu kurz oder leer.",
|
notes="Impressum-Text zu kurz oder leer.",
|
||||||
)
|
)
|
||||||
|
|
||||||
# ── Layer 0 + 1 + 2 (Boost + Keyword + Embedding) ──────────
|
# ── Findings aus den 12 autoritativen §5-TMG/DDG-MCs (mcs.py) ──
|
||||||
results, telemetry = await run_v3_pipeline(text, scope)
|
# Die DB-Menge doc_type='impressum' ist verunreinigt (TRD/SEC/GOV-
|
||||||
notes_parts.append(
|
# Controls statt §5 TMG, fehl-klassifiziert von einer Vorgaenger-
|
||||||
f"v3-pipeline: {telemetry.get('total_mcs', 0)} DB-MCs · "
|
# Session), daher sind bis zum MC-Re-Filtering die 12 praezisen
|
||||||
f"{telemetry.get('layer_0_field_hits', 0)} Pattern-Boosts · "
|
# Pattern-MCs die Findings-Quelle. field_id = semantisches Feld
|
||||||
f"{telemetry.get('layer_0_boost_overrides', 0)} Boost-Overrides"
|
# (passt zum Semantic-Validator + den GT-Tests).
|
||||||
)
|
is_auto = "automotive" in scope
|
||||||
sec_drop = telemetry.get("sector_dropped", 0)
|
for mc in MCS:
|
||||||
off_drop = telemetry.get("offtopic_dropped", 0)
|
if not scope_matches(mc, scope, is_auto):
|
||||||
if sec_drop or off_drop:
|
coverage.append(McCoverage(
|
||||||
notes_parts.append(
|
mc_id=mc.mc_id, status="na",
|
||||||
f"Scope-Filter: {sec_drop} Branchen-MCs + "
|
reason="nicht im Business-Scope",
|
||||||
f"{off_drop} themenfremde MCs entfernt"
|
))
|
||||||
)
|
|
||||||
|
|
||||||
# DB-MCs → Findings + Coverage
|
|
||||||
seen_db_mcs: set[str] = set()
|
|
||||||
for r in results:
|
|
||||||
mc_id = r.get("control_id") or ""
|
|
||||||
if not mc_id or mc_id in seen_db_mcs:
|
|
||||||
continue
|
continue
|
||||||
seen_db_mcs.add(mc_id)
|
if any(p.search(text) for p in mc.patterns):
|
||||||
passed = bool(r.get("passed"))
|
coverage.append(McCoverage(
|
||||||
sev = _SEV_TO_ENUM.get(
|
mc_id=mc.mc_id, status="ok", reason="Pattern-Treffer",
|
||||||
(r.get("severity") or "MEDIUM").upper(), Severity.MEDIUM,
|
))
|
||||||
)
|
|
||||||
coverage.append(McCoverage(
|
|
||||||
mc_id=mc_id,
|
|
||||||
status="ok" if passed else sev.value.lower(),
|
|
||||||
reason=str(r.get("matched_text") or r.get("hint") or "")[:120],
|
|
||||||
))
|
|
||||||
if passed:
|
|
||||||
continue
|
continue
|
||||||
label = r.get("label") or r.get("hint") or ""
|
sev = _SEV_TO_ENUM.get(mc.severity_if_missing, Severity.MEDIUM)
|
||||||
norm_str = str(r.get("regulation") or "")
|
|
||||||
if r.get("article"):
|
|
||||||
norm_str = (norm_str + f" Art. {r.get('article')}").strip()
|
|
||||||
findings.append(Finding(
|
findings.append(Finding(
|
||||||
check_id=f"DBMC-{mc_id}",
|
check_id=f"IMP-{mc.field_id}",
|
||||||
agent=self.agent_id,
|
agent=self.agent_id,
|
||||||
agent_version=self.agent_version,
|
agent_version=self.agent_version,
|
||||||
field_id=mc_id,
|
field_id=mc.field_id,
|
||||||
severity=sev,
|
severity=sev,
|
||||||
severity_reason="db_mc_failed",
|
severity_reason="pflichtangabe_missing",
|
||||||
title=str(label)[:200] or f"DB-MC {mc_id} nicht erfüllt",
|
title=f"Pflichtangabe fehlt: {mc.label}",
|
||||||
norm=norm_str,
|
norm=mc.norm,
|
||||||
evidence="",
|
evidence="",
|
||||||
action=_build_measure(str(label), norm_str)[:400],
|
action=_build_measure(mc.label, mc.norm),
|
||||||
confidence=0.9,
|
confidence=0.9,
|
||||||
sources=[EvidenceSource(
|
sources=[EvidenceSource(
|
||||||
source_type=SourceType.MC,
|
source_type=SourceType.REGEX,
|
||||||
source_id=mc_id,
|
source_id=mc.mc_id,
|
||||||
detail=str(r.get("source") or "keyword_match")[:120],
|
detail="kein Pattern-Treffer im Text",
|
||||||
confidence=0.9,
|
confidence=0.9,
|
||||||
)],
|
)],
|
||||||
))
|
))
|
||||||
|
|
||||||
# Layer 0: eigene Pattern-IDs immer mit ins coverage (für UI)
|
|
||||||
boost_ids = set(telemetry.get("layer_0_field_ids") or [])
|
|
||||||
for mc in MCS:
|
|
||||||
cov_status = "ok" if mc.field_id in boost_ids else "na"
|
|
||||||
cov_reason = ("regex-boost hit"
|
|
||||||
if mc.field_id in boost_ids
|
|
||||||
else "kein Pattern-Treffer (kein Veto)")
|
|
||||||
coverage.append(McCoverage(
|
coverage.append(McCoverage(
|
||||||
mc_id=mc.mc_id, status=cov_status, reason=cov_reason,
|
mc_id=mc.mc_id, status=sev.value.lower(),
|
||||||
|
reason="kein Pattern-Treffer",
|
||||||
))
|
))
|
||||||
|
notes_parts.append(
|
||||||
|
f"{len(MCS)} §5-TMG-MCs geprüft · "
|
||||||
|
f"{len(findings)} Pflichtangabe(n) offen"
|
||||||
|
)
|
||||||
|
|
||||||
# ── Layer 3: Semantic-Validator nur für HIGH/MEDIUM-Fails ──
|
# ── Layer 3: Semantic-Validator nur für HIGH/MEDIUM-Fails ──
|
||||||
await self._semantic_demote(text, findings, coverage)
|
await self._semantic_demote(text, findings, coverage)
|
||||||
@@ -199,7 +178,7 @@ class ImpressumAgent(BaseSpecialistAgent):
|
|||||||
candidates = [
|
candidates = [
|
||||||
f for f in findings
|
f for f in findings
|
||||||
if f.severity in (Severity.HIGH.value, Severity.MEDIUM.value)
|
if f.severity in (Severity.HIGH.value, Severity.MEDIUM.value)
|
||||||
and f.severity_reason == "db_mc_failed"
|
and f.severity_reason == "pflichtangabe_missing"
|
||||||
]
|
]
|
||||||
if not candidates:
|
if not candidates:
|
||||||
return
|
return
|
||||||
@@ -232,9 +211,11 @@ class ImpressumAgent(BaseSpecialistAgent):
|
|||||||
detail=f"LLM-confirmed: '{label_used}'",
|
detail=f"LLM-confirmed: '{label_used}'",
|
||||||
confidence=conf,
|
confidence=conf,
|
||||||
))
|
))
|
||||||
# Coverage update + auto-learning
|
# Coverage update + auto-learning (mc_id steckt in der Quelle)
|
||||||
|
mc_id_for_cov = (finding.sources[0].source_id
|
||||||
|
if finding.sources else "")
|
||||||
for c in coverage:
|
for c in coverage:
|
||||||
if c.mc_id and c.mc_id == f"DBMC-{finding.field_id}":
|
if c.mc_id and c.mc_id == mc_id_for_cov:
|
||||||
c.status = "low"
|
c.status = "low"
|
||||||
c.reason = f"label_mismatch: '{label_used}'"
|
c.reason = f"label_mismatch: '{label_used}'"
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ from compliance.services.specialist_agents import (
|
|||||||
from compliance.services.specialist_agents.impressum.agent import (
|
from compliance.services.specialist_agents.impressum.agent import (
|
||||||
_build_measure,
|
_build_measure,
|
||||||
)
|
)
|
||||||
|
from compliance.services.specialist_agents.impressum.mcs import MCS
|
||||||
from compliance.services.specialist_agents.impressum.regex_boost import (
|
from compliance.services.specialist_agents.impressum.regex_boost import (
|
||||||
BOOST_KEYWORDS,
|
BOOST_KEYWORDS,
|
||||||
boost_matches_db_mc,
|
boost_matches_db_mc,
|
||||||
@@ -108,80 +109,49 @@ def test_boost_keywords_cover_all_field_ids():
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def mock_v3(monkeypatch):
|
def no_llm(monkeypatch):
|
||||||
"""Mockt run_v3_pipeline mit deterministischen Fake-Results."""
|
"""Deaktiviert den LLM-Semantic-Validator — der Agent prueft die 12
|
||||||
async def _fake_pipeline(text, scope, db_url=""):
|
mcs.py-Pattern-MCs deterministisch direkt am Text."""
|
||||||
results = [
|
async def _no_validator(*a, **kw):
|
||||||
{"control_id": "AUTH-1954-A04",
|
return {}
|
||||||
"passed": True,
|
|
||||||
"label": "Anbieterkennzeichnung dokumentiert",
|
|
||||||
"severity": "HIGH",
|
|
||||||
"regulation": "TMG",
|
|
||||||
"article": "§ 5",
|
|
||||||
"hint": "",
|
|
||||||
"matched_text": "Tesla Germany GmbH",
|
|
||||||
"source": "keyword_match"},
|
|
||||||
{"control_id": "DATA-2786-A04",
|
|
||||||
"passed": False,
|
|
||||||
"label": "Freiwilligkeit der TDDDG-Einwilligungen",
|
|
||||||
"severity": "MEDIUM",
|
|
||||||
"regulation": "TDDDG",
|
|
||||||
"article": "§ 25",
|
|
||||||
"hint": "Bitte Freiwilligkeit dokumentieren",
|
|
||||||
"matched_text": "",
|
|
||||||
"source": ""},
|
|
||||||
]
|
|
||||||
telemetry = {
|
|
||||||
"layer_0_field_hits": 5,
|
|
||||||
"layer_0_field_ids": ["kontakt_email", "kontakt_telefon",
|
|
||||||
"handelsregister", "ust_id",
|
|
||||||
"vertretungsberechtigte"],
|
|
||||||
"layer_1_pass": 1,
|
|
||||||
"layer_1_fail": 1,
|
|
||||||
"layer_0_boost_overrides": 0,
|
|
||||||
"total_mcs": 2,
|
|
||||||
}
|
|
||||||
return results, telemetry
|
|
||||||
monkeypatch.setattr(
|
|
||||||
"compliance.services.specialist_agents.impressum.agent.run_v3_pipeline",
|
|
||||||
_fake_pipeline,
|
|
||||||
)
|
|
||||||
async def _no_validator(*a, **kw): return {}
|
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
"compliance.services.specialist_agents.impressum.agent.validate_present",
|
"compliance.services.specialist_agents.impressum.agent.validate_present",
|
||||||
_no_validator,
|
_no_validator,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_agent_uses_db_mcs(mock_v3):
|
def test_agent_emits_pflichtangabe_findings(no_llm):
|
||||||
agent = ImpressumAgent()
|
agent = ImpressumAgent()
|
||||||
out = _run(agent.evaluate(AgentInput(doc_type="impressum",
|
out = _run(agent.evaluate(AgentInput(doc_type="impressum",
|
||||||
text=TESLA_TEXT)))
|
text=TESLA_TEXT)))
|
||||||
db_mc_findings = [f for f in out.findings
|
fids = {f.field_id for f in out.findings}
|
||||||
if f.check_id.startswith("DBMC-")]
|
# Tesla nennt 'Management' (englisch) → deutsches GF-Label fehlt
|
||||||
assert len(db_mc_findings) == 1
|
assert "vertretungsberechtigte_label_korrekt" in fids
|
||||||
assert db_mc_findings[0].check_id == "DBMC-DATA-2786-A04"
|
f = next(f for f in out.findings
|
||||||
assert db_mc_findings[0].severity == Severity.MEDIUM.value
|
if f.field_id == "vertretungsberechtigte_label_korrekt")
|
||||||
assert "TDDDG" in db_mc_findings[0].norm
|
assert f.severity == Severity.MEDIUM.value
|
||||||
|
assert f.check_id == "IMP-vertretungsberechtigte_label_korrekt"
|
||||||
|
assert f.severity_reason == "pflichtangabe_missing"
|
||||||
|
# Vorhandene Pflichtangaben erzeugen KEIN Finding
|
||||||
|
assert "kontakt_email" not in fids
|
||||||
|
assert "handelsregister" not in fids
|
||||||
|
|
||||||
|
|
||||||
def test_agent_emits_boost_coverage(mock_v3):
|
def test_agent_coverage_has_all_12(no_llm):
|
||||||
agent = ImpressumAgent()
|
agent = ImpressumAgent()
|
||||||
out = _run(agent.evaluate(AgentInput(doc_type="impressum",
|
out = _run(agent.evaluate(AgentInput(doc_type="impressum",
|
||||||
text=TESLA_TEXT)))
|
text=TESLA_TEXT)))
|
||||||
# 2 DB-MCs + 12 Pattern-Boost-Slots = 14 coverage entries
|
assert out.mc_total == len(MCS) # je MC genau 1 Coverage-Eintrag
|
||||||
assert out.mc_total >= 14
|
ok = [c for c in out.mc_coverage if c.status == "ok"]
|
||||||
boost_ok = [c for c in out.mc_coverage
|
# name, email, telefon, HR, USt, vertretungsberechtigte = 6 vorhanden
|
||||||
if c.mc_id.startswith("IMP-MC-") and c.status == "ok"]
|
assert len(ok) == 6
|
||||||
assert len(boost_ok) == 5 # 5 boost_ids im fake
|
|
||||||
|
|
||||||
|
|
||||||
def test_agent_notes_telemetry(mock_v3):
|
def test_agent_notes(no_llm):
|
||||||
agent = ImpressumAgent()
|
agent = ImpressumAgent()
|
||||||
out = _run(agent.evaluate(AgentInput(doc_type="impressum",
|
out = _run(agent.evaluate(AgentInput(doc_type="impressum",
|
||||||
text=TESLA_TEXT)))
|
text=TESLA_TEXT)))
|
||||||
assert "v3-pipeline" in out.notes
|
assert "§5-TMG-MCs geprüft" in out.notes
|
||||||
assert "Pattern-Boosts" in out.notes
|
|
||||||
|
|
||||||
|
|
||||||
def test_short_text_skipped():
|
def test_short_text_skipped():
|
||||||
|
|||||||
Reference in New Issue
Block a user