diff --git a/backend-compliance/compliance/services/specialist_agents/impressum/agent.py b/backend-compliance/compliance/services/specialist_agents/impressum/agent.py index 9cade99c..c67cfd93 100644 --- a/backend-compliance/compliance/services/specialist_agents/impressum/agent.py +++ b/backend-compliance/compliance/services/specialist_agents/impressum/agent.py @@ -39,8 +39,7 @@ from .._base import ( from .._pattern_library import record as record_pattern from .._rollup import rollup from .._semantic_validator import build_rename_action, validate_present -from .mcs import MC_IDS, MCS, detect_automotive -from .v3_engine import run_v3_pipeline +from .mcs import MC_IDS, MCS, detect_automotive, scope_matches logger = logging.getLogger(__name__) @@ -107,73 +106,53 @@ class ImpressumAgent(BaseSpecialistAgent): notes="Impressum-Text zu kurz oder leer.", ) - # ── Layer 0 + 1 + 2 (Boost + Keyword + Embedding) ────────── - results, telemetry = await run_v3_pipeline(text, scope) - notes_parts.append( - f"v3-pipeline: {telemetry.get('total_mcs', 0)} DB-MCs · " - f"{telemetry.get('layer_0_field_hits', 0)} Pattern-Boosts · " - f"{telemetry.get('layer_0_boost_overrides', 0)} Boost-Overrides" - ) - sec_drop = telemetry.get("sector_dropped", 0) - off_drop = telemetry.get("offtopic_dropped", 0) - if sec_drop or off_drop: - notes_parts.append( - f"Scope-Filter: {sec_drop} Branchen-MCs + " - f"{off_drop} themenfremde MCs entfernt" - ) - - # DB-MCs → Findings + Coverage - seen_db_mcs: set[str] = set() - for r in results: - mc_id = r.get("control_id") or "" - if not mc_id or mc_id in seen_db_mcs: + # ── Findings aus den 12 autoritativen §5-TMG/DDG-MCs (mcs.py) ── + # Die DB-Menge doc_type='impressum' ist verunreinigt (TRD/SEC/GOV- + # Controls statt §5 TMG, fehl-klassifiziert von einer Vorgaenger- + # Session), daher sind bis zum MC-Re-Filtering die 12 praezisen + # Pattern-MCs die Findings-Quelle. field_id = semantisches Feld + # (passt zum Semantic-Validator + den GT-Tests). + is_auto = "automotive" in scope + for mc in MCS: + if not scope_matches(mc, scope, is_auto): + coverage.append(McCoverage( + mc_id=mc.mc_id, status="na", + reason="nicht im Business-Scope", + )) continue - seen_db_mcs.add(mc_id) - passed = bool(r.get("passed")) - sev = _SEV_TO_ENUM.get( - (r.get("severity") or "MEDIUM").upper(), Severity.MEDIUM, - ) - coverage.append(McCoverage( - mc_id=mc_id, - status="ok" if passed else sev.value.lower(), - reason=str(r.get("matched_text") or r.get("hint") or "")[:120], - )) - if passed: + if any(p.search(text) for p in mc.patterns): + coverage.append(McCoverage( + mc_id=mc.mc_id, status="ok", reason="Pattern-Treffer", + )) continue - label = r.get("label") or r.get("hint") or "" - norm_str = str(r.get("regulation") or "") - if r.get("article"): - norm_str = (norm_str + f" Art. {r.get('article')}").strip() + sev = _SEV_TO_ENUM.get(mc.severity_if_missing, Severity.MEDIUM) findings.append(Finding( - check_id=f"DBMC-{mc_id}", + check_id=f"IMP-{mc.field_id}", agent=self.agent_id, agent_version=self.agent_version, - field_id=mc_id, + field_id=mc.field_id, severity=sev, - severity_reason="db_mc_failed", - title=str(label)[:200] or f"DB-MC {mc_id} nicht erfüllt", - norm=norm_str, + severity_reason="pflichtangabe_missing", + title=f"Pflichtangabe fehlt: {mc.label}", + norm=mc.norm, evidence="", - action=_build_measure(str(label), norm_str)[:400], + action=_build_measure(mc.label, mc.norm), confidence=0.9, sources=[EvidenceSource( - source_type=SourceType.MC, - source_id=mc_id, - detail=str(r.get("source") or "keyword_match")[:120], + source_type=SourceType.REGEX, + source_id=mc.mc_id, + detail="kein Pattern-Treffer im Text", confidence=0.9, )], )) - - # Layer 0: eigene Pattern-IDs immer mit ins coverage (für UI) - boost_ids = set(telemetry.get("layer_0_field_ids") or []) - for mc in MCS: - cov_status = "ok" if mc.field_id in boost_ids else "na" - cov_reason = ("regex-boost hit" - if mc.field_id in boost_ids - else "kein Pattern-Treffer (kein Veto)") coverage.append(McCoverage( - mc_id=mc.mc_id, status=cov_status, reason=cov_reason, + mc_id=mc.mc_id, status=sev.value.lower(), + reason="kein Pattern-Treffer", )) + notes_parts.append( + f"{len(MCS)} §5-TMG-MCs geprüft · " + f"{len(findings)} Pflichtangabe(n) offen" + ) # ── Layer 3: Semantic-Validator nur für HIGH/MEDIUM-Fails ── await self._semantic_demote(text, findings, coverage) @@ -199,7 +178,7 @@ class ImpressumAgent(BaseSpecialistAgent): candidates = [ f for f in findings if f.severity in (Severity.HIGH.value, Severity.MEDIUM.value) - and f.severity_reason == "db_mc_failed" + and f.severity_reason == "pflichtangabe_missing" ] if not candidates: return @@ -232,9 +211,11 @@ class ImpressumAgent(BaseSpecialistAgent): detail=f"LLM-confirmed: '{label_used}'", confidence=conf, )) - # Coverage update + auto-learning + # Coverage update + auto-learning (mc_id steckt in der Quelle) + mc_id_for_cov = (finding.sources[0].source_id + if finding.sources else "") for c in coverage: - if c.mc_id and c.mc_id == f"DBMC-{finding.field_id}": + if c.mc_id and c.mc_id == mc_id_for_cov: c.status = "low" c.reason = f"label_mismatch: '{label_used}'" try: diff --git a/backend-compliance/tests/test_impressum_v3.py b/backend-compliance/tests/test_impressum_v3.py index a2e420b1..d11f9a0c 100644 --- a/backend-compliance/tests/test_impressum_v3.py +++ b/backend-compliance/tests/test_impressum_v3.py @@ -18,6 +18,7 @@ from compliance.services.specialist_agents import ( from compliance.services.specialist_agents.impressum.agent import ( _build_measure, ) +from compliance.services.specialist_agents.impressum.mcs import MCS from compliance.services.specialist_agents.impressum.regex_boost import ( BOOST_KEYWORDS, boost_matches_db_mc, @@ -108,80 +109,49 @@ def test_boost_keywords_cover_all_field_ids(): @pytest.fixture -def mock_v3(monkeypatch): - """Mockt run_v3_pipeline mit deterministischen Fake-Results.""" - async def _fake_pipeline(text, scope, db_url=""): - results = [ - {"control_id": "AUTH-1954-A04", - "passed": True, - "label": "Anbieterkennzeichnung dokumentiert", - "severity": "HIGH", - "regulation": "TMG", - "article": "§ 5", - "hint": "", - "matched_text": "Tesla Germany GmbH", - "source": "keyword_match"}, - {"control_id": "DATA-2786-A04", - "passed": False, - "label": "Freiwilligkeit der TDDDG-Einwilligungen", - "severity": "MEDIUM", - "regulation": "TDDDG", - "article": "§ 25", - "hint": "Bitte Freiwilligkeit dokumentieren", - "matched_text": "", - "source": ""}, - ] - telemetry = { - "layer_0_field_hits": 5, - "layer_0_field_ids": ["kontakt_email", "kontakt_telefon", - "handelsregister", "ust_id", - "vertretungsberechtigte"], - "layer_1_pass": 1, - "layer_1_fail": 1, - "layer_0_boost_overrides": 0, - "total_mcs": 2, - } - return results, telemetry - monkeypatch.setattr( - "compliance.services.specialist_agents.impressum.agent.run_v3_pipeline", - _fake_pipeline, - ) - async def _no_validator(*a, **kw): return {} +def no_llm(monkeypatch): + """Deaktiviert den LLM-Semantic-Validator — der Agent prueft die 12 + mcs.py-Pattern-MCs deterministisch direkt am Text.""" + async def _no_validator(*a, **kw): + return {} monkeypatch.setattr( "compliance.services.specialist_agents.impressum.agent.validate_present", _no_validator, ) -def test_agent_uses_db_mcs(mock_v3): +def test_agent_emits_pflichtangabe_findings(no_llm): agent = ImpressumAgent() out = _run(agent.evaluate(AgentInput(doc_type="impressum", text=TESLA_TEXT))) - db_mc_findings = [f for f in out.findings - if f.check_id.startswith("DBMC-")] - assert len(db_mc_findings) == 1 - assert db_mc_findings[0].check_id == "DBMC-DATA-2786-A04" - assert db_mc_findings[0].severity == Severity.MEDIUM.value - assert "TDDDG" in db_mc_findings[0].norm + fids = {f.field_id for f in out.findings} + # Tesla nennt 'Management' (englisch) → deutsches GF-Label fehlt + assert "vertretungsberechtigte_label_korrekt" in fids + f = next(f for f in out.findings + if f.field_id == "vertretungsberechtigte_label_korrekt") + assert f.severity == Severity.MEDIUM.value + assert f.check_id == "IMP-vertretungsberechtigte_label_korrekt" + assert f.severity_reason == "pflichtangabe_missing" + # Vorhandene Pflichtangaben erzeugen KEIN Finding + assert "kontakt_email" not in fids + assert "handelsregister" not in fids -def test_agent_emits_boost_coverage(mock_v3): +def test_agent_coverage_has_all_12(no_llm): agent = ImpressumAgent() out = _run(agent.evaluate(AgentInput(doc_type="impressum", text=TESLA_TEXT))) - # 2 DB-MCs + 12 Pattern-Boost-Slots = 14 coverage entries - assert out.mc_total >= 14 - boost_ok = [c for c in out.mc_coverage - if c.mc_id.startswith("IMP-MC-") and c.status == "ok"] - assert len(boost_ok) == 5 # 5 boost_ids im fake + assert out.mc_total == len(MCS) # je MC genau 1 Coverage-Eintrag + ok = [c for c in out.mc_coverage if c.status == "ok"] + # name, email, telefon, HR, USt, vertretungsberechtigte = 6 vorhanden + assert len(ok) == 6 -def test_agent_notes_telemetry(mock_v3): +def test_agent_notes(no_llm): agent = ImpressumAgent() out = _run(agent.evaluate(AgentInput(doc_type="impressum", text=TESLA_TEXT))) - assert "v3-pipeline" in out.notes - assert "Pattern-Boosts" in out.notes + assert "§5-TMG-MCs geprüft" in out.notes def test_short_text_skipped():