Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 872145d883 | |||
| 9bdaa28038 | |||
| e2be51b0aa | |||
| bd65b6f318 |
@@ -55,5 +55,9 @@ EXPOSE 3000
|
||||
# Set hostname
|
||||
ENV HOSTNAME="0.0.0.0"
|
||||
|
||||
# P83 — Build-SHA fuer check-rebuild-needed.sh
|
||||
ARG BUILD_SHA="unknown"
|
||||
ENV BUILD_SHA=${BUILD_SHA}
|
||||
|
||||
# Start the application
|
||||
CMD ["node", "server.js"]
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
import { describe, expect, it } from 'vitest'
|
||||
import { calculateAP } from './useFMEA'
|
||||
|
||||
describe('calculateAP — AIAG-VDA 2019 Handbook Action Priority', () => {
|
||||
it('returns H for severity 10 with mid occurrence', () => {
|
||||
expect(calculateAP(10, 5, 5)).toBe('H')
|
||||
})
|
||||
|
||||
it('returns H for severity 9 with low detection', () => {
|
||||
expect(calculateAP(9, 4, 7)).toBe('H')
|
||||
})
|
||||
|
||||
it('returns M for severity 9 with low occurrence and good detection', () => {
|
||||
expect(calculateAP(9, 2, 5)).toBe('M')
|
||||
})
|
||||
|
||||
it('returns L for severity 9 with very low occurrence and detection', () => {
|
||||
expect(calculateAP(9, 1, 4)).toBe('L')
|
||||
})
|
||||
|
||||
it('returns H for severity 7 with high occurrence', () => {
|
||||
expect(calculateAP(7, 5, 1)).toBe('H')
|
||||
})
|
||||
|
||||
it('returns M for severity 7 with mid occurrence', () => {
|
||||
expect(calculateAP(7, 3, 5)).toBe('M')
|
||||
})
|
||||
|
||||
it('returns L for low-severity well-controlled mode', () => {
|
||||
expect(calculateAP(3, 1, 1)).toBe('L')
|
||||
})
|
||||
|
||||
it('returns L for severity 5 with very low occurrence and detection', () => {
|
||||
expect(calculateAP(5, 1, 1)).toBe('L')
|
||||
})
|
||||
})
|
||||
@@ -156,5 +156,52 @@ export function useFMEA(projectId: string) {
|
||||
// Get unique components for the suggest button
|
||||
const components = [...new Map(rows.map((r) => [r.component.id, r.component])).values()]
|
||||
|
||||
return { rows, loading, stats, components, suggestFMs, suggesting, suggestions, suggestSource, setSuggestions }
|
||||
/**
|
||||
* Accept a suggested FM: build an FMEA row from the FM defaults, prepend it
|
||||
* to the table state, and remove the FM from the suggestion list.
|
||||
* Returns false if the (component, fm.id) combo already exists in rows.
|
||||
*/
|
||||
function acceptSuggestion(fm: FailureMode, componentId: string): boolean {
|
||||
const comp = components.find((c) => c.id === componentId)
|
||||
if (!comp) return false
|
||||
const dup = rows.find((r) => r.component.id === componentId && r.failureMode.id === fm.id)
|
||||
if (dup) {
|
||||
// Still drop the suggestion so the UI does not keep offering it.
|
||||
setSuggestions((prev) => prev.filter((s) => s.id !== fm.id))
|
||||
return false
|
||||
}
|
||||
const s = fm.default_severity || 5
|
||||
const o = fm.default_occurrence || 5
|
||||
const d = fm.default_detection || 5
|
||||
const newRow: FMEARow = {
|
||||
component: comp,
|
||||
failureMode: fm,
|
||||
severity: s,
|
||||
occurrence: o,
|
||||
detection: d,
|
||||
rpz: s * o * d,
|
||||
ap: calculateAP(s, o, d),
|
||||
}
|
||||
setRows((prev) => [newRow, ...prev].sort((a, b) => b.rpz - a.rpz))
|
||||
setSuggestions((prev) => prev.filter((sg) => sg.id !== fm.id))
|
||||
return true
|
||||
}
|
||||
|
||||
function rejectSuggestion(fmId: string) {
|
||||
setSuggestions((prev) => prev.filter((sg) => sg.id !== fmId))
|
||||
}
|
||||
|
||||
return {
|
||||
rows,
|
||||
loading,
|
||||
stats,
|
||||
components,
|
||||
suggestFMs,
|
||||
suggesting,
|
||||
suggestions,
|
||||
suggestSource,
|
||||
setSuggestions,
|
||||
acceptSuggestion,
|
||||
rejectSuggestion,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
'use client'
|
||||
|
||||
import { useState } from 'react'
|
||||
import { useEffect, useState } from 'react'
|
||||
import { useParams } from 'next/navigation'
|
||||
import { useFMEA, type FMEARow } from './_hooks/useFMEA'
|
||||
|
||||
@@ -27,8 +27,17 @@ function rpzLabel(rpz: number): string {
|
||||
|
||||
export default function FMEAPage() {
|
||||
const { projectId } = useParams<{ projectId: string }>()
|
||||
const { rows, loading, stats, components, suggestFMs, suggesting, suggestions, suggestSource, setSuggestions } = useFMEA(projectId)
|
||||
const { rows, loading, stats, components, suggestFMs, suggesting, suggestions, suggestSource, setSuggestions, acceptSuggestion, rejectSuggestion } = useFMEA(projectId)
|
||||
const [suggestComp, setSuggestComp] = useState<string | null>(null)
|
||||
const [acceptedCount, setAcceptedCount] = useState(0)
|
||||
|
||||
// Reset accepted-count when a fresh suggestion run is loaded or the panel closes.
|
||||
useEffect(() => {
|
||||
if (suggesting) setAcceptedCount(0)
|
||||
}, [suggesting])
|
||||
useEffect(() => {
|
||||
if (suggestions.length === 0) setAcceptedCount(0)
|
||||
}, [suggestions.length])
|
||||
|
||||
if (loading) {
|
||||
return (
|
||||
@@ -97,26 +106,60 @@ export default function FMEAPage() {
|
||||
{suggestions.length > 0 && (
|
||||
<div className="bg-purple-50 dark:bg-purple-900/20 border border-purple-200 dark:border-purple-800 rounded-xl p-4">
|
||||
<div className="flex items-center justify-between mb-3">
|
||||
<h3 className="text-sm font-semibold text-purple-800 dark:text-purple-300">
|
||||
KI-Vorschlaege ({suggestions.length}) — {suggestSource === 'llm' ? 'LLM-generiert' : 'Bibliothek'}
|
||||
</h3>
|
||||
<div>
|
||||
<h3 className="text-sm font-semibold text-purple-800 dark:text-purple-300">
|
||||
KI-Vorschlaege ({suggestions.length}) — {suggestSource === 'llm' ? 'LLM-generiert' : 'Bibliothek-Fallback'}
|
||||
</h3>
|
||||
{acceptedCount > 0 && (
|
||||
<div className="text-xs text-green-700 dark:text-green-400 mt-0.5">
|
||||
{acceptedCount} Vorschlag{acceptedCount > 1 ? 'e' : ''} uebernommen
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
<button onClick={() => setSuggestions([])} className="text-xs text-purple-600 hover:text-purple-800">Schliessen</button>
|
||||
</div>
|
||||
<div className="space-y-2">
|
||||
{suggestions.map((fm, i) => (
|
||||
<div key={i} className="flex items-center justify-between bg-white dark:bg-gray-800 rounded-lg p-3 border border-purple-100 dark:border-purple-800">
|
||||
<div className="flex-1 min-w-0">
|
||||
<div className="text-sm font-medium text-gray-900 dark:text-white">{fm.name_de}</div>
|
||||
<div className="text-xs text-gray-500 mt-0.5">{fm.effect}</div>
|
||||
<div className="flex gap-3 mt-1 text-xs text-gray-400">
|
||||
<span>S={fm.default_severity}</span>
|
||||
<span>O={fm.default_occurrence}</span>
|
||||
<span>D={fm.default_detection}</span>
|
||||
<span className="font-bold">RPZ={fm.default_severity * fm.default_occurrence * fm.default_detection}</span>
|
||||
{suggestions.map((fm) => {
|
||||
const rpz = fm.default_severity * fm.default_occurrence * fm.default_detection
|
||||
return (
|
||||
<div key={fm.id} className="flex items-start justify-between gap-3 bg-white dark:bg-gray-800 rounded-lg p-3 border border-purple-100 dark:border-purple-800">
|
||||
<div className="flex-1 min-w-0">
|
||||
<div className="text-sm font-medium text-gray-900 dark:text-white">{fm.name_de}</div>
|
||||
<div className="text-xs text-gray-500 mt-0.5">{fm.effect}</div>
|
||||
<div className="flex gap-3 mt-1 text-xs text-gray-400">
|
||||
<span>S={fm.default_severity}</span>
|
||||
<span>O={fm.default_occurrence}</span>
|
||||
<span>D={fm.default_detection}</span>
|
||||
<span className={`font-bold ${rpz > 200 ? 'text-red-600' : rpz > 100 ? 'text-orange-600' : 'text-gray-500'}`}>RPZ={rpz}</span>
|
||||
</div>
|
||||
</div>
|
||||
<div className="flex items-center gap-1.5 shrink-0">
|
||||
<button
|
||||
onClick={() => {
|
||||
if (!suggestComp) return
|
||||
const ok = acceptSuggestion(fm, suggestComp)
|
||||
if (ok) setAcceptedCount((c) => c + 1)
|
||||
}}
|
||||
disabled={!suggestComp}
|
||||
className="px-3 py-1.5 bg-green-600 hover:bg-green-700 disabled:opacity-50 disabled:cursor-not-allowed text-white text-xs font-medium rounded transition-colors"
|
||||
title="Diesen Fehlermodus der FMEA-Tabelle hinzufuegen"
|
||||
>
|
||||
Uebernehmen
|
||||
</button>
|
||||
<button
|
||||
onClick={() => rejectSuggestion(fm.id)}
|
||||
className="px-3 py-1.5 bg-gray-200 dark:bg-gray-700 hover:bg-gray-300 dark:hover:bg-gray-600 text-gray-700 dark:text-gray-300 text-xs font-medium rounded transition-colors"
|
||||
title="Diesen Vorschlag verwerfen"
|
||||
>
|
||||
Ablehnen
|
||||
</button>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
))}
|
||||
)
|
||||
})}
|
||||
</div>
|
||||
<div className="text-[10px] text-purple-700 dark:text-purple-400 mt-3">
|
||||
Hinweis: Uebernommene Fehlermodi erscheinen sofort in der Tabelle unten. Bewertung (S/O/D) ist anpassbar — Standardwerte aus der Bibliothek.
|
||||
</div>
|
||||
</div>
|
||||
)}
|
||||
|
||||
@@ -73,6 +73,7 @@ export function SidebarModuleList({ collapsed, projectId, pendingCRCount }: Side
|
||||
<AdditionalModuleItem href="/sdk/ai-registration" icon={<svg className="w-5 h-5" fill="none" stroke="currentColor" viewBox="0 0 24 24"><path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M19 21V5a2 2 0 00-2-2H7a2 2 0 00-2 2v16m14 0h2m-2 0h-5m-9 0H3m2 0h5M9 7h1m-1 4h1m4-4h1m-1 4h1m-5 10v-5a1 1 0 011-1h2a1 1 0 011 1v5m-4 0h4" /></svg>} label="EU Registrierung" isActive={pathname?.startsWith('/sdk/ai-registration') ?? false} collapsed={collapsed} projectId={projectId} />
|
||||
<AdditionalModuleItem href="/sdk/compliance-optimizer" icon={<svg className="w-5 h-5" fill="none" stroke="currentColor" viewBox="0 0 24 24"><path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M13 7h8m0 0v8m0-8l-8 8-4-4-6 6" /></svg>} label="Compliance Optimizer" isActive={pathname?.startsWith('/sdk/compliance-optimizer') ?? false} collapsed={collapsed} projectId={projectId} />
|
||||
<AdditionalModuleItem href="/sdk/agent" icon={<svg className="w-5 h-5" fill="none" stroke="currentColor" viewBox="0 0 24 24"><path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M9.75 17L9 20l-1 1h8l-1-1-.75-3M3 13h18M5 17h14a2 2 0 002-2V5a2 2 0 00-2-2H5a2 2 0 00-2 2v10a2 2 0 002 2z" /></svg>} label="Compliance Agent" isActive={pathname?.startsWith('/sdk/agent') ?? false} collapsed={collapsed} projectId={projectId} />
|
||||
<AdditionalModuleItem href="/sdk/benchmark" icon={<svg className="w-5 h-5" fill="none" stroke="currentColor" viewBox="0 0 24 24"><path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M9 19v-6a2 2 0 00-2-2H5a2 2 0 00-2 2v6a2 2 0 002 2h2a2 2 0 002-2zm0 0V9a2 2 0 012-2h2a2 2 0 012 2v10m-6 0a2 2 0 002 2h2a2 2 0 002-2m0 0V5a2 2 0 012-2h2a2 2 0 012 2v14a2 2 0 01-2 2h-2a2 2 0 01-2-2z" /></svg>} label="Branchen-Benchmark" isActive={pathname?.startsWith('/sdk/benchmark') ?? false} collapsed={collapsed} projectId={projectId} />
|
||||
</div>
|
||||
|
||||
{/* CRA Compliance */}
|
||||
|
||||
@@ -60,5 +60,9 @@ EXPOSE 8002
|
||||
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
|
||||
CMD curl -f http://127.0.0.1:8002/health || exit 1
|
||||
|
||||
# P83 — Build-SHA fuer check-rebuild-needed.sh
|
||||
ARG BUILD_SHA="unknown"
|
||||
ENV BUILD_SHA=${BUILD_SHA}
|
||||
|
||||
# Run the application
|
||||
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8002"]
|
||||
|
||||
@@ -1184,6 +1184,22 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
|
||||
if (not c.passed and not c.skipped
|
||||
and (c.severity or "").upper() in ("CRITICAL", "HIGH")):
|
||||
fails_by_doc.setdefault(r.doc_type, []).append(rec)
|
||||
# P106 — Audit-Type-Klassifizierung pro MC. Interne Prozess-/
|
||||
# Doku-Checks werden NICHT als FAIL gewertet sondern als CHECK
|
||||
# (manuelle Pruefung beim DSB notwendig).
|
||||
try:
|
||||
from compliance.services.mc_audit_type import (
|
||||
annotate_mc_results, split_by_audit_type,
|
||||
)
|
||||
annotate_mc_results(all_mc_checks)
|
||||
mc_split = split_by_audit_type(all_mc_checks)
|
||||
# Fails-by-doc neu aufbauen: nur noch echte verifiable Fails
|
||||
fails_by_doc = {}
|
||||
for r in mc_split.get("verifiable_fails") or []:
|
||||
fails_by_doc.setdefault("dse", []).append(r)
|
||||
except Exception as e:
|
||||
logger.warning("P106 mc_audit_type skipped: %s", e)
|
||||
mc_split = {"internal_checks": [], "verifiable_fails": all_mc_checks}
|
||||
scorecard = build_scorecard(all_mc_checks) if all_mc_checks else {}
|
||||
# Trend: load previous scorecard for the same tenant + domain so the
|
||||
# email can show delta indicators (A6).
|
||||
@@ -1486,6 +1502,39 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
|
||||
except Exception as e:
|
||||
logger.warning("P71 jc_avv_decision skipped: %s", e)
|
||||
|
||||
# P6/P53/P55 — Branchen-Kontext + Site-History
|
||||
industry_ctx_html = ""
|
||||
try:
|
||||
from compliance.services.industry_library import (
|
||||
build_industry_context_block_html, load_site_profile,
|
||||
)
|
||||
from database import SessionLocal as _SLib
|
||||
_ind_db = _SLib()
|
||||
try:
|
||||
ind = (req.scan_context or {}).get("industry") if req.scan_context else None
|
||||
site_prof = load_site_profile(_ind_db, domain_for_exec or "")
|
||||
industry_ctx_html = build_industry_context_block_html(ind, site_prof)
|
||||
finally:
|
||||
_ind_db.close()
|
||||
except Exception as e:
|
||||
logger.warning("industry context skipped: %s", e)
|
||||
|
||||
# P106 — Internal-Checks-Block (interne Prozesse / Doku-Pflichten)
|
||||
internal_checks_html = ""
|
||||
try:
|
||||
from compliance.services.mc_audit_type import (
|
||||
build_internal_checks_block_html,
|
||||
)
|
||||
ic = (mc_split or {}).get("internal_checks") or []
|
||||
if ic:
|
||||
internal_checks_html = build_internal_checks_block_html(ic)
|
||||
logger.info(
|
||||
"P106: %d interne Checks (statt FAIL) im Block",
|
||||
len(ic),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("P106 internal_checks_html skipped: %s", e)
|
||||
|
||||
# P85 — Banner-Screenshot fuer visuellen Beweis (zwischen
|
||||
# GF-1-Pager und Detail-Bloecken)
|
||||
banner_shot_html = ""
|
||||
@@ -1595,7 +1644,8 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
|
||||
+ bench_html + diff_html
|
||||
+ critical_html + scope_disclaimer_html + exec_summary_html
|
||||
+ cookie_arch_html + summary_html + scanned_html + profile_html
|
||||
+ scorecard_html + redundancy_html
|
||||
+ scorecard_html + internal_checks_html + redundancy_html
|
||||
+ industry_ctx_html
|
||||
+ banner_shot_html
|
||||
+ providers_html + banner_deep_html
|
||||
+ cookie_audit_html
|
||||
|
||||
@@ -0,0 +1,50 @@
|
||||
{
|
||||
"source": "Verordnung (EU) 2015/758 - eCall",
|
||||
"official_url": "https://eur-lex.europa.eu/legal-content/DE/TXT/?uri=CELEX%3A32015R0758",
|
||||
"ingest_for": "RAG-Korpus (Compliance fuer Automotive-OEMs)",
|
||||
"chunks": [
|
||||
{
|
||||
"id": "ecall-art-3-1",
|
||||
"title": "Art. 3 (1) — bordeigenes eCall-System",
|
||||
"text": "Hersteller stellen sicher, dass alle neuen Typen von Personenkraftwagen und leichten Nutzfahrzeugen mit einem auf 112 basierten bordeigenen eCall-System ausgestattet sind, das den in dieser Verordnung festgelegten Anforderungen und harmonisierten Normen entspricht."
|
||||
},
|
||||
{
|
||||
"id": "ecall-art-6-1",
|
||||
"title": "Art. 6 (1) — Datenschutz",
|
||||
"text": "Bei der Verarbeitung personenbezogener Daten ueber das auf 112 basierte bordeigene eCall-System gewaehrleisten Hersteller die Einhaltung der Richtlinie 95/46/EG und der RL 2002/58/EG. Insbesondere muessen Fahrzeughalter darueber informiert werden, dass das System dauerhaft im Standby-Modus ist und im Falle eines schweren Unfalls automatisch ausgeloest wird."
|
||||
},
|
||||
{
|
||||
"id": "ecall-art-6-2",
|
||||
"title": "Art. 6 (2) — Datenverarbeitung",
|
||||
"text": "Die Verarbeitung personenbezogener Daten ueber das auf 112 basierte bordeigene eCall-System darf nur zum Zwecke der Bearbeitung von Notrufen erfolgen. Diese Daten sind unmittelbar nach Bearbeitung des Notrufs ohne automatisierte Speicherung zu loeschen, soweit nicht anders gesetzlich vorgesehen."
|
||||
},
|
||||
{
|
||||
"id": "ecall-art-6-3",
|
||||
"title": "Art. 6 (3) — Standortdaten",
|
||||
"text": "Die Standortdaten des Fahrzeugs werden zur Behandlung des Notrufes uebermittelt. Eine permanente Standortueberwachung ausserhalb von Notfaellen ist nicht zulaessig."
|
||||
},
|
||||
{
|
||||
"id": "ecall-art-6-4",
|
||||
"title": "Art. 6 (4) — Informationspflicht",
|
||||
"text": "Hersteller stellen sicher, dass in der technischen Dokumentation des Fahrzeugs klare und vollstaendige Informationen ueber die Verarbeitung personenbezogener Daten gegeben werden, einschliesslich des Rechts der betroffenen Person auf Auskunft und gegebenenfalls Berichtigung sowie Sperrung der sie betreffenden personenbezogenen Daten."
|
||||
},
|
||||
{
|
||||
"id": "ecall-art-6-5",
|
||||
"title": "Art. 6 (5) — Mehrwertdienste",
|
||||
"text": "Mehrwertdienste (z.B. private Pannenruf-Apps) duerfen nur mit ausdruecklicher Einwilligung des Fahrzeughalters in Anspruch genommen werden. Das auf 112 basierte bordeigene eCall-System darf nicht von diesen Mehrwertdiensten beeintraechtigt werden und muss kostenlos und fuer alle Fahrzeughalter verfuegbar sein."
|
||||
},
|
||||
{
|
||||
"id": "ecall-art-7",
|
||||
"title": "Art. 7 — Datenfluss",
|
||||
"text": "Der Mindestdatensatz (MSD) umfasst Fahrzeug-ID (VIN), Ausloesungsart, Zeitstempel, Standort, Fahrtrichtung, Antriebsenergie, Anzahl angeschnallter Insassen. Diese Daten gehen an die naechste oeffentliche Notrufabfragestelle (PSAP)."
|
||||
}
|
||||
],
|
||||
"compliance_implications": {
|
||||
"automotive_oem": [
|
||||
"Hersteller MUSS in der DSE den eCall-Datenfluss erklaeren (Art. 6 (4)).",
|
||||
"Standortdaten ausserhalb von Notfaellen sind UNZULAESSIG (Art. 6 (3)).",
|
||||
"Mehrwertdienste brauchen separate ausdrueckliche Einwilligung (Art. 6 (5)).",
|
||||
"Daten nach Notruf-Bearbeitung SOFORT zu loeschen (Art. 6 (2))."
|
||||
]
|
||||
}
|
||||
}
|
||||
@@ -85,6 +85,82 @@ def replay_from_snapshot(
|
||||
section_sizes: dict[str, int] = {}
|
||||
parts: list[str] = []
|
||||
|
||||
# P80 v2 — Quality-Checks aus dem aktuellen Code auf Snapshot-Daten
|
||||
# anwenden. Vollstaendiger Replay aller post-fetch Findings-Generatoren.
|
||||
cookie_t = doc_texts.get("cookie") or doc_texts.get("dse") or ""
|
||||
|
||||
# Vendor-Normalize (Dedup + Garbage-Filter)
|
||||
try:
|
||||
from compliance.services.vendor_normalizer import normalize_vendors
|
||||
cmp_vendors = normalize_vendors(list(cmp_vendors))
|
||||
except Exception as e:
|
||||
logger.warning("Replay v2: normalizer failed: %s", e)
|
||||
|
||||
# Audit-Quality
|
||||
try:
|
||||
from compliance.services.audit_quality_checks import (
|
||||
run_all as run_aq, build_audit_quality_block_html,
|
||||
)
|
||||
aq = run_aq(banner_result, cookie_t, cmp_vendors, doc_entries)
|
||||
if aq:
|
||||
aq_html = build_audit_quality_block_html(aq)
|
||||
parts.append(aq_html)
|
||||
section_sizes["audit_quality_v2"] = len(aq_html)
|
||||
except Exception as e:
|
||||
logger.warning("Replay v2: audit_quality failed: %s", e)
|
||||
|
||||
# Cookie-Compliance-Audit
|
||||
try:
|
||||
from compliance.services.cookie_compliance_audit import (
|
||||
audit_cookie_compliance, build_cookie_audit_block_html,
|
||||
)
|
||||
ca = audit_cookie_compliance(db, cookie_t, banner_result)
|
||||
if ca and (ca.get("declared_count") or ca.get("browser_count")):
|
||||
ca_html = build_cookie_audit_block_html(ca)
|
||||
parts.append(ca_html)
|
||||
section_sizes["cookie_audit_v2"] = len(ca_html)
|
||||
except Exception as e:
|
||||
logger.warning("Replay v2: cookie_audit failed: %s", e)
|
||||
|
||||
# TCF Authority
|
||||
try:
|
||||
from compliance.services.tcf_vendor_authority import (
|
||||
cross_reference_with_tcf, build_tcf_authority_block_html,
|
||||
)
|
||||
tcf = cross_reference_with_tcf(db, cmp_vendors)
|
||||
if tcf:
|
||||
tcf_html = build_tcf_authority_block_html(tcf)
|
||||
parts.append(tcf_html)
|
||||
section_sizes["tcf_v2"] = len(tcf_html)
|
||||
except Exception as e:
|
||||
logger.warning("Replay v2: tcf failed: %s", e)
|
||||
|
||||
# Entropy + Network-Trace
|
||||
try:
|
||||
from compliance.services.cookie_value_entropy import (
|
||||
check_cookies_for_entropy_mismatch, build_entropy_block_html,
|
||||
)
|
||||
from compliance.services.cookie_network_tracer import (
|
||||
trace_cookie_network, build_network_trace_block_html,
|
||||
)
|
||||
cd = (banner_result or {}).get("cookies_detailed") or []
|
||||
e1 = check_cookies_for_entropy_mismatch(cd)
|
||||
if e1:
|
||||
ent_html = build_entropy_block_html(e1)
|
||||
parts.append(ent_html)
|
||||
section_sizes["entropy_v2"] = len(ent_html)
|
||||
site_url = ""
|
||||
for entry in (doc_entries or []):
|
||||
if entry.get("url"):
|
||||
site_url = entry["url"]; break
|
||||
net = trace_cookie_network(cd, site_url)
|
||||
if net:
|
||||
net_html = build_network_trace_block_html(net)
|
||||
parts.append(net_html)
|
||||
section_sizes["network_trace_v2"] = len(net_html)
|
||||
except Exception as e:
|
||||
logger.warning("Replay v2: entropy/network failed: %s", e)
|
||||
|
||||
# P82: GF-1-Pager zuerst (5-Bullet-Summary)
|
||||
try:
|
||||
from compliance.services.gf_one_pager import build_gf_one_pager_html
|
||||
|
||||
@@ -0,0 +1,125 @@
|
||||
"""
|
||||
P54 — Diff-Banner fuer End-User (USP-Feature).
|
||||
|
||||
USP-Idee: bei wiederkehrenden Besuchern zeigt das Banner NICHT die
|
||||
Standard-Frage, sondern eine Diff-Mitteilung:
|
||||
"Seit deiner letzten Zustimmung haben wir hinzugefuegt:
|
||||
* Microsoft Bing (Werbung)
|
||||
* TikTok Pixel (Marketing)
|
||||
Bitte erneut zustimmen oder anpassen."
|
||||
|
||||
Backend-Seite (hier): liefert pro Snapshot eine 'diff_for_user'-Struktur
|
||||
die zum Embedden in eigenen Banner / Hinweistext genutzt werden kann.
|
||||
Frontend-Banner-Lib (separate consent-sdk) konsumiert das.
|
||||
|
||||
Vergleicht Vendor-Listen zwischen aktuellem Snapshot und dem letzten
|
||||
Snapshot mit gleicher site_domain.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Iterable
|
||||
|
||||
from sqlalchemy import text as sa_text
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _norm_vendor_set(vendors: Iterable) -> set[str]:
|
||||
out: set[str] = set()
|
||||
for v in (vendors or []):
|
||||
if isinstance(v, dict):
|
||||
n = (v.get("name") or "").strip()
|
||||
elif isinstance(v, str):
|
||||
n = v.strip()
|
||||
else:
|
||||
continue
|
||||
if n:
|
||||
out.add(n)
|
||||
return out
|
||||
|
||||
|
||||
def compute_user_facing_diff(
|
||||
db: Session,
|
||||
site_domain: str,
|
||||
current_check_id: str,
|
||||
current_cmp_vendors: list,
|
||||
) -> dict | None:
|
||||
"""Vergleicht aktuelle vs letzte cmp_vendors-Liste fuer die gleiche
|
||||
site_domain. Liefert {prev_at, added_vendors, removed_vendors,
|
||||
new_high_risk_categories} oder None wenn kein vorheriger Lauf."""
|
||||
if not site_domain:
|
||||
return None
|
||||
try:
|
||||
row = db.execute(sa_text(
|
||||
"""
|
||||
SELECT cmp_vendors, created_at
|
||||
FROM compliance.compliance_check_snapshots
|
||||
WHERE site_domain = :dom AND check_id != :ex
|
||||
ORDER BY created_at DESC LIMIT 1
|
||||
"""
|
||||
), {"dom": site_domain, "ex": current_check_id}).fetchone()
|
||||
except Exception as e:
|
||||
logger.warning("diff lookup failed: %s", e)
|
||||
return None
|
||||
if not row:
|
||||
return None
|
||||
|
||||
prev_vendors = row[0] or []
|
||||
prev_at = row[1]
|
||||
curr_set = _norm_vendor_set(current_cmp_vendors)
|
||||
prev_set = _norm_vendor_set(prev_vendors)
|
||||
|
||||
added = sorted(curr_set - prev_set)
|
||||
removed = sorted(prev_set - curr_set)
|
||||
if not added and not removed:
|
||||
return None
|
||||
|
||||
# High-risk Kategorien aus added Vendors: Marketing / Tracking
|
||||
new_marketing: list[str] = []
|
||||
for v in current_cmp_vendors:
|
||||
if not isinstance(v, dict):
|
||||
continue
|
||||
n = (v.get("name") or "").strip()
|
||||
cat = (v.get("category") or "").lower()
|
||||
if n in added and cat in ("marketing", "tracking", "advertising"):
|
||||
new_marketing.append(n)
|
||||
|
||||
return {
|
||||
"prev_at": prev_at.isoformat() if prev_at else None,
|
||||
"added_vendors": added,
|
||||
"removed_vendors": removed,
|
||||
"new_marketing_vendors": new_marketing,
|
||||
"requires_reconsent": bool(new_marketing),
|
||||
}
|
||||
|
||||
|
||||
def build_diff_banner_snippet(diff: dict) -> str:
|
||||
"""Liefert HTML-Snippet das der Site-Betreiber in seinen eigenen
|
||||
Cookie-Banner einbauen kann (z.B. via consent-sdk)."""
|
||||
if not diff or not diff.get("added_vendors"):
|
||||
return ""
|
||||
added = diff.get("added_vendors", [])
|
||||
n_marketing = len(diff.get("new_marketing_vendors") or [])
|
||||
items = "".join(f"<li>{v}</li>" for v in added[:8])
|
||||
reconsent_note = ""
|
||||
if diff.get("requires_reconsent"):
|
||||
reconsent_note = (
|
||||
f'<p style="margin:6px 0 0;color:#991b1b;font-size:12px">'
|
||||
f'<strong>{n_marketing} neue{"r" if n_marketing == 1 else ""} '
|
||||
f'Marketing-Anbieter</strong> seit Ihrer letzten Zustimmung — '
|
||||
'bitte erneut bestaetigen.'
|
||||
'</p>'
|
||||
)
|
||||
return (
|
||||
'<div class="breakpilot-consent-diff" '
|
||||
'style="font-family:-apple-system,sans-serif;font-size:12px;'
|
||||
'padding:8px 12px;background:#fef3c7;border:1px solid #fde68a;'
|
||||
'border-radius:6px;margin-bottom:8px">'
|
||||
'<strong>Seit Ihrer letzten Zustimmung haben wir hinzugefuegt:</strong>'
|
||||
f'<ul style="margin:4px 0 0 18px;padding:0">{items}</ul>'
|
||||
+ reconsent_note +
|
||||
'</div>'
|
||||
)
|
||||
@@ -0,0 +1,222 @@
|
||||
"""
|
||||
P6 + P53 + P55 — OEM-Cross-Industry-Library mit Autonomes Profiling.
|
||||
|
||||
Vereinheitlicht 3 verwandte Themen:
|
||||
* P6 — Branchen-Knowledge-Base: was ist branchen-spezifisch (Automotive
|
||||
hat eCall, eHealth hat Patientendaten, Finance hat MaRisk).
|
||||
* P53 — OEM-Site-Profile-Library: bekannte Pattern pro OEM-Site
|
||||
(Mercedes hat cmm-cookie-banner, BMW hat ePaaS, VW hat
|
||||
cookiemgmt, Audi blocked Akamai 503).
|
||||
* P55 — Autonomes Profiling: bei jedem Lauf lernen wir Pattern dazu
|
||||
und persistieren sie in der Library.
|
||||
|
||||
Backend-Service: Lookup-API + Auto-Lern-Hook bei jedem Snapshot-Save.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from typing import Iterable
|
||||
|
||||
from sqlalchemy import text as sa_text
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Branchen-spezifische zusaetzliche Compliance-Themen
|
||||
_INDUSTRY_PROFILES: dict[str, dict] = {
|
||||
"automotive": {
|
||||
"mandatory_regulations": [
|
||||
"DSGVO", "TDDDG",
|
||||
"VO 2015/758 (eCall)",
|
||||
"VO 2018/858 (Typgenehmigung)",
|
||||
"VO 2019/2144 (Allgemeine Sicherheit)",
|
||||
"Cyber Security UN-R 155",
|
||||
"Software Update UN-R 156",
|
||||
],
|
||||
"typical_cookie_vendors": [
|
||||
"Adobe Analytics", "Adobe Target", "Salesforce LiveAgent",
|
||||
"AdForm", "The Trade Desk", "Google Marketing Platform",
|
||||
"Inbenta", "Datadog RUM",
|
||||
],
|
||||
"vvt_required_processes": [
|
||||
"Probefahrten-Buchung", "Haendler-Suche", "eCall-System",
|
||||
"We Connect / Connected Drive Services", "Konfigurator-Daten",
|
||||
],
|
||||
"special_findings_to_watch": [
|
||||
"eCall ohne Hinweis in DSE = Verstoss VO 2015/758 Art. 6(4)",
|
||||
"Connected-Car-Telemetrie ohne Einwilligung",
|
||||
"Haendler-Weitergabe nicht erwaehnt (Art. 13(1)(e))",
|
||||
],
|
||||
},
|
||||
"ecommerce": {
|
||||
"mandatory_regulations": [
|
||||
"DSGVO", "TDDDG", "Fernabsatzgesetz",
|
||||
"Verbraucherrechterichtlinie (EU 2011/83)",
|
||||
"Geo-Blocking-Verordnung (EU 2018/302)",
|
||||
],
|
||||
"typical_cookie_vendors": [
|
||||
"Google Analytics", "Google Ads", "Meta Pixel",
|
||||
"Pinterest", "TikTok", "Criteo", "AppNexus",
|
||||
"Klaviyo", "Hotjar",
|
||||
],
|
||||
"vvt_required_processes": [
|
||||
"Bestellung", "Zahlung", "Versand", "Retoure",
|
||||
"Newsletter", "Account-Verwaltung",
|
||||
],
|
||||
"special_findings_to_watch": [
|
||||
"Widerrufsbelehrung muss 14-Tage-Frist + Wertersatz nennen",
|
||||
"Muster-Widerrufsformular als Anlage Pflicht",
|
||||
"Kundenkonto-Loeschung muss in DSR-Prozess sein",
|
||||
],
|
||||
},
|
||||
"saas": {
|
||||
"mandatory_regulations": [
|
||||
"DSGVO", "TDDDG", "AI Act (wenn KI-Features)",
|
||||
"NIS-2 (wenn kritische Infrastruktur)",
|
||||
],
|
||||
"typical_cookie_vendors": [
|
||||
"Segment", "Amplitude", "Mixpanel", "Hotjar",
|
||||
"Intercom", "HubSpot", "Salesforce", "Stripe",
|
||||
],
|
||||
"vvt_required_processes": [
|
||||
"Login / Auth", "Trial-Signup", "Abrechnung",
|
||||
"Support-Tickets", "Telemetry / Usage-Analytics",
|
||||
],
|
||||
"special_findings_to_watch": [
|
||||
"B2B-AVV (Art. 28) statt Endkunden-DSE",
|
||||
"Sub-Prozessor-Liste muss vollstaendig sein",
|
||||
"Drittland (USA-Hosting) erfordert SCC + TIA",
|
||||
],
|
||||
},
|
||||
"banking": {
|
||||
"mandatory_regulations": [
|
||||
"DSGVO", "TDDDG", "PSD2 (Payment Services Directive)",
|
||||
"MaRisk", "BAIT (BaFin)", "KWG", "GwG",
|
||||
],
|
||||
"typical_cookie_vendors": [
|
||||
"Adobe Analytics", "Glassbox", "ContentSquare",
|
||||
"Decibel", "Qualtrics",
|
||||
],
|
||||
"vvt_required_processes": [
|
||||
"Kontoeroeffnung", "Zahlungsverkehr", "Kreditpruefung",
|
||||
"Geldwaesche-Pruefung (GwG)", "Schufa-Anfrage",
|
||||
],
|
||||
"special_findings_to_watch": [
|
||||
"PSD2 Strong-Customer-Authentication Pflicht",
|
||||
"Bankgeheimnis = zusaetzlicher Schutz",
|
||||
"GwG-Pflicht-Identifikation erfordert spezielle DSE-Klausel",
|
||||
],
|
||||
},
|
||||
"healthcare": {
|
||||
"mandatory_regulations": [
|
||||
"DSGVO Art. 9 (Gesundheitsdaten)",
|
||||
"Medizinprodukteverordnung (MDR)",
|
||||
"Patientendaten-Schutzgesetz (PDSG)",
|
||||
"DiGAV (Digitale-Gesundheitsanwendungen-Verordnung)",
|
||||
],
|
||||
"typical_cookie_vendors": [
|
||||
"Sehr restriktiv — i.d.R. nur essential",
|
||||
],
|
||||
"vvt_required_processes": [
|
||||
"Termin-Vereinbarung", "Anamnese-Bogen",
|
||||
"Befund-Versand", "ePA-Anbindung",
|
||||
],
|
||||
"special_findings_to_watch": [
|
||||
"Art. 9 DSGVO erfordert ausdrueckliche Einwilligung",
|
||||
"Schweigepflicht §203 StGB",
|
||||
"Drittland-Transfer fast immer unzulaessig",
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def lookup_industry_profile(industry: str | None) -> dict | None:
|
||||
"""Liefert das Branchenprofil oder None."""
|
||||
if not industry:
|
||||
return None
|
||||
return _INDUSTRY_PROFILES.get(industry.lower())
|
||||
|
||||
|
||||
# Site-Profile (gelernt aus vorherigen Snapshots)
|
||||
def load_site_profile(db: Session, site_domain: str) -> dict | None:
|
||||
"""Liefert gespeichertes Profil fuer eine Site (CMP-Provider,
|
||||
bekannte Quirks etc.) oder None."""
|
||||
if not site_domain:
|
||||
return None
|
||||
try:
|
||||
row = db.execute(sa_text(
|
||||
"""
|
||||
SELECT banner_provider,
|
||||
jsonb_array_length(coalesce(cmp_vendors, jsonb_build_array())) AS n_vendors,
|
||||
created_at
|
||||
FROM compliance.compliance_check_snapshots
|
||||
WHERE site_domain = :dom
|
||||
ORDER BY created_at DESC LIMIT 5
|
||||
"""
|
||||
), {"dom": site_domain}).fetchall()
|
||||
except Exception:
|
||||
return None
|
||||
if not row:
|
||||
return None
|
||||
providers = [r[0] for r in row if r[0]]
|
||||
vendor_counts = [r[1] for r in row if r[1] is not None]
|
||||
if not providers:
|
||||
return None
|
||||
# Most common provider
|
||||
from collections import Counter
|
||||
common_provider = Counter(providers).most_common(1)[0][0]
|
||||
avg_vendors = sum(vendor_counts) // max(1, len(vendor_counts))
|
||||
return {
|
||||
"site_domain": site_domain,
|
||||
"common_provider": common_provider,
|
||||
"avg_vendor_count": avg_vendors,
|
||||
"historical_runs": len(row),
|
||||
"last_run": row[0][2].isoformat() if row[0][2] else None,
|
||||
}
|
||||
|
||||
|
||||
def build_industry_context_block_html(
|
||||
industry: str | None,
|
||||
site_profile: dict | None,
|
||||
) -> str:
|
||||
"""Eingangsblock in der Mail: 'Was wir in dieser Branche pruefen
|
||||
sollten' + 'Was wir ueber diese Site schon wissen'."""
|
||||
parts: list[str] = []
|
||||
profile = lookup_industry_profile(industry)
|
||||
if profile:
|
||||
regs = ", ".join(profile.get("mandatory_regulations", [])[:6])
|
||||
watches = profile.get("special_findings_to_watch", [])[:3]
|
||||
watch_html = "".join(
|
||||
f'<li style="font-size:11px;color:#475569">{w}</li>'
|
||||
for w in watches
|
||||
)
|
||||
parts.append(
|
||||
'<div style="background:#eff6ff;border:1px solid #bfdbfe;'
|
||||
'border-radius:6px;padding:10px 14px;margin-bottom:8px">'
|
||||
f'<div style="font-size:11px;color:#1e40af;font-weight:600;'
|
||||
f'text-transform:uppercase;letter-spacing:1px">'
|
||||
f'Branchen-Kontext: {industry}</div>'
|
||||
f'<p style="font-size:11px;color:#475569;margin:4px 0">'
|
||||
f'<strong>Geltende Spezial-Regulierungen:</strong> {regs}'
|
||||
f'</p>'
|
||||
f'<div style="font-size:11px;color:#475569"><strong>Worauf '
|
||||
f'wir bei dieser Branche besonders schauen:</strong></div>'
|
||||
f'<ul style="margin:4px 0 0 18px;padding:0">{watch_html}</ul>'
|
||||
'</div>'
|
||||
)
|
||||
if site_profile and site_profile.get("historical_runs", 0) > 1:
|
||||
parts.append(
|
||||
'<div style="background:#f5f3ff;border:1px solid #ddd6fe;'
|
||||
'border-radius:6px;padding:8px 12px;margin-bottom:8px;'
|
||||
'font-size:11px;color:#5b21b6">'
|
||||
f'Wir haben diese Site bereits {site_profile["historical_runs"]}× '
|
||||
f'analysiert. Bekannter CMP-Provider: '
|
||||
f'<strong>{site_profile["common_provider"]}</strong>, '
|
||||
f'historische Vendor-Zahl: ~{site_profile["avg_vendor_count"]}.'
|
||||
'</div>'
|
||||
)
|
||||
return "".join(parts)
|
||||
@@ -0,0 +1,229 @@
|
||||
"""
|
||||
P31 — Tiered LLM-Cascade mit Confidence + Valkey-Cache.
|
||||
|
||||
Bisherige LLM-Calls (vendor_llm_extractor, mc_solution_generator):
|
||||
* gehen direkt an Qwen lokal → bei kompliziertem Input lange Latenz
|
||||
* fallen bei Fail manuell auf OVH 120B zurueck
|
||||
* Kein Cache → gleiche Eingabe kostet x-mal Zeit
|
||||
|
||||
Diese Modul vereinheitlicht:
|
||||
1. Cache-Lookup (md5(prompt) → cached response, TTL 7d)
|
||||
2. Qwen-Aufruf mit kurzem Timeout (90s)
|
||||
3. Wenn fail/leer ODER confidence < threshold → OVH 120B (45s)
|
||||
4. Wenn auch fail → Anthropic Claude (last resort)
|
||||
5. Response wird gecached
|
||||
|
||||
confidence-Heuristik:
|
||||
* parsed JSON erfolgreich + non-empty → 0.8
|
||||
* JSON-Parse failed → 0.0
|
||||
* JSON ok aber nur 1 Item bei >5000 chars input → 0.3
|
||||
|
||||
Backend-API: await call_with_cascade(prompt, system_prompt, expected_min_items)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# In-process Cache wenn kein Valkey verfuegbar
|
||||
_LOCAL_CACHE: dict[str, dict] = {}
|
||||
_LOCAL_CACHE_MAX = 200
|
||||
|
||||
|
||||
def _cache_key(system: str, user: str, model_hint: str = "") -> str:
|
||||
blob = f"{system}\n---\n{user}\n---\n{model_hint}"
|
||||
return "llm:" + hashlib.md5(blob.encode()).hexdigest()[:24]
|
||||
|
||||
|
||||
def _cache_get(key: str) -> dict | None:
|
||||
try:
|
||||
import redis # noqa: WPS433
|
||||
url = os.getenv("VALKEY_URL", "redis://bp-core-valkey:6379")
|
||||
r = redis.Redis.from_url(url, socket_timeout=2.0,
|
||||
decode_responses=True)
|
||||
v = r.get(key)
|
||||
if v:
|
||||
return json.loads(v)
|
||||
except Exception:
|
||||
pass
|
||||
return _LOCAL_CACHE.get(key)
|
||||
|
||||
|
||||
def _cache_put(key: str, value: dict, ttl: int = 604800) -> None:
|
||||
try:
|
||||
import redis # noqa: WPS433
|
||||
url = os.getenv("VALKEY_URL", "redis://bp-core-valkey:6379")
|
||||
r = redis.Redis.from_url(url, socket_timeout=2.0,
|
||||
decode_responses=True)
|
||||
r.setex(key, ttl, json.dumps(value)[:200000])
|
||||
return
|
||||
except Exception:
|
||||
pass
|
||||
if len(_LOCAL_CACHE) >= _LOCAL_CACHE_MAX:
|
||||
for k in list(_LOCAL_CACHE.keys())[:50]:
|
||||
_LOCAL_CACHE.pop(k, None)
|
||||
_LOCAL_CACHE[key] = value
|
||||
|
||||
|
||||
def _heuristic_confidence(response_text: str, input_len: int) -> float:
|
||||
if not response_text:
|
||||
return 0.0
|
||||
try:
|
||||
obj = json.loads(response_text)
|
||||
except Exception:
|
||||
# Try to extract JSON block
|
||||
a, b = response_text.find("{"), response_text.rfind("}")
|
||||
if 0 <= a < b:
|
||||
try:
|
||||
obj = json.loads(response_text[a:b + 1])
|
||||
except Exception:
|
||||
return 0.1
|
||||
else:
|
||||
return 0.1
|
||||
n_items = 0
|
||||
if isinstance(obj, dict):
|
||||
for v in obj.values():
|
||||
if isinstance(v, list):
|
||||
n_items += len(v)
|
||||
elif isinstance(v, dict):
|
||||
n_items += 1
|
||||
if input_len > 5000 and n_items <= 1:
|
||||
return 0.3
|
||||
if n_items >= 5:
|
||||
return 0.9
|
||||
return 0.7
|
||||
|
||||
|
||||
async def _call_ollama(system: str, user: str,
|
||||
max_tokens: int = 6000,
|
||||
timeout: float = 90.0) -> str:
|
||||
base = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
|
||||
model = os.getenv("CMP_LLM_MODEL", "qwen3:30b-a3b")
|
||||
payload = {
|
||||
"model": model, "stream": False, "format": "json",
|
||||
"messages": [{"role": "system", "content": system},
|
||||
{"role": "user", "content": user}],
|
||||
"options": {"temperature": 0.05, "num_predict": max_tokens},
|
||||
}
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=timeout) as c:
|
||||
r = await c.post(f"{base.rstrip('/')}/api/chat", json=payload)
|
||||
r.raise_for_status()
|
||||
return (r.json().get("message") or {}).get("content", "") or ""
|
||||
except Exception as e:
|
||||
logger.warning("ollama cascade tier 1 failed: %s", e)
|
||||
return ""
|
||||
|
||||
|
||||
async def _call_ovh(system: str, user: str, max_tokens: int = 6000) -> str:
|
||||
base = os.getenv("OVH_LLM_URL", "").strip()
|
||||
key = os.getenv("OVH_LLM_KEY", "").strip()
|
||||
model = os.getenv("OVH_LLM_MODEL", "").strip()
|
||||
if not base or not model:
|
||||
return ""
|
||||
headers = {"Content-Type": "application/json"}
|
||||
if key:
|
||||
headers["Authorization"] = f"Bearer {key}"
|
||||
payload = {
|
||||
"model": model, "temperature": 0.05, "max_tokens": max_tokens,
|
||||
"messages": [{"role": "system", "content": system},
|
||||
{"role": "user", "content": user}],
|
||||
"response_format": {"type": "json_object"},
|
||||
}
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=45.0) as c:
|
||||
r = await c.post(f"{base.rstrip('/')}/v1/chat/completions",
|
||||
json=payload, headers=headers)
|
||||
r.raise_for_status()
|
||||
choice = (r.json().get("choices") or [{}])[0]
|
||||
return (choice.get("message") or {}).get("content", "") or ""
|
||||
except Exception as e:
|
||||
logger.warning("ovh cascade tier 2 failed: %s", e)
|
||||
return ""
|
||||
|
||||
|
||||
async def _call_anthropic(system: str, user: str,
|
||||
max_tokens: int = 4000) -> str:
|
||||
key = os.getenv("ANTHROPIC_API_KEY", "").strip()
|
||||
if not key:
|
||||
return ""
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"x-api-key": key,
|
||||
"anthropic-version": "2023-06-01",
|
||||
}
|
||||
payload = {
|
||||
"model": "claude-haiku-4-5-20251001",
|
||||
"max_tokens": max_tokens, "temperature": 0.05,
|
||||
"system": system,
|
||||
"messages": [{"role": "user", "content": user}],
|
||||
}
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=30.0) as c:
|
||||
r = await c.post("https://api.anthropic.com/v1/messages",
|
||||
json=payload, headers=headers)
|
||||
r.raise_for_status()
|
||||
blocks = r.json().get("content") or []
|
||||
return "".join(b.get("text", "") for b in blocks if isinstance(b, dict))
|
||||
except Exception as e:
|
||||
logger.warning("anthropic cascade tier 3 failed: %s", e)
|
||||
return ""
|
||||
|
||||
|
||||
async def call_with_cascade(
|
||||
system: str,
|
||||
user: str,
|
||||
min_confidence: float = 0.6,
|
||||
max_tokens: int = 6000,
|
||||
) -> dict:
|
||||
"""Returns {'text': str, 'confidence': float, 'source': str,
|
||||
'cached': bool}."""
|
||||
key = _cache_key(system, user)
|
||||
cached = _cache_get(key)
|
||||
if cached:
|
||||
cached["cached"] = True
|
||||
return cached
|
||||
|
||||
input_len = len(user)
|
||||
# Tier 1: Qwen lokal
|
||||
text = await _call_ollama(system, user, max_tokens=max_tokens)
|
||||
conf = _heuristic_confidence(text, input_len)
|
||||
if text and conf >= min_confidence:
|
||||
out = {"text": text, "confidence": conf,
|
||||
"source": "qwen", "cached": False}
|
||||
_cache_put(key, out)
|
||||
return out
|
||||
|
||||
# Tier 2: OVH 120B
|
||||
text2 = await _call_ovh(system, user, max_tokens=max_tokens)
|
||||
conf2 = _heuristic_confidence(text2, input_len)
|
||||
if text2 and conf2 >= min_confidence:
|
||||
out = {"text": text2, "confidence": conf2,
|
||||
"source": "ovh_120b", "cached": False}
|
||||
_cache_put(key, out)
|
||||
return out
|
||||
|
||||
# Tier 3: Anthropic Claude (Notnagel)
|
||||
text3 = await _call_anthropic(system, user, max_tokens=max_tokens // 2)
|
||||
conf3 = _heuristic_confidence(text3, input_len)
|
||||
if text3 and conf3 >= min_confidence:
|
||||
out = {"text": text3, "confidence": conf3,
|
||||
"source": "anthropic_claude", "cached": False}
|
||||
_cache_put(key, out)
|
||||
return out
|
||||
|
||||
# Nichts hat geliefert — beste Variante wenigstens zurueckgeben
|
||||
best_text = text or text2 or text3 or ""
|
||||
best_conf = max(conf, conf2, conf3)
|
||||
best_source = "qwen" if text else ("ovh_120b" if text2 else "anthropic")
|
||||
return {"text": best_text, "confidence": best_conf,
|
||||
"source": best_source, "cached": False,
|
||||
"below_threshold": True}
|
||||
@@ -0,0 +1,269 @@
|
||||
"""
|
||||
P106 — MC-Audit-Type-Klassifizierung.
|
||||
|
||||
Zentrales Problem: viele Master-Controls pruefen Sachverhalte, die wir
|
||||
von Aussen GAR NICHT pruefen koennen — z.B. ob das Unternehmen einen
|
||||
internen Loeschkonzept-Prozess hat oder Schulungen durchgefuehrt wurden.
|
||||
|
||||
Bisher: alle MCs deren Pattern im Text nicht matched → FAIL.
|
||||
Folge: GF-Mail mit 95 FAILs, davon ~60-70 in Wirklichkeit nur 'unknown'.
|
||||
|
||||
Loesung: pro MC klassifizieren:
|
||||
* verifiable → Pattern muss im sichtbaren Dokument stehen (Audit moeglich)
|
||||
* process_internal → interner Prozess des Kunden (Schulung, AVV-Vertrag, …)
|
||||
* doc_internal → interne Dokumentation (VVT-Eintrag, DSFA-File, …)
|
||||
* ambiguous → koennte beides sein
|
||||
|
||||
In der MC-Auswertung:
|
||||
* verifiable + Pattern fehlt → echtes FAIL ❌
|
||||
* process_internal → CHECK (Hinweis 'Bitte intern pruefen') ⓘ
|
||||
* doc_internal → CHECK (Hinweis 'Im VVT/DSFA dokumentiert?') ⓘ
|
||||
* ambiguous → CHECK mit Warnung
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Patterns die auf interne Prozesse hindeuten (NICHT von aussen pruefbar)
|
||||
_PROCESS_INTERNAL_PATTERNS = [
|
||||
# Schulung / Mitarbeiter
|
||||
r"\bmitarbeiter\b.*schul",
|
||||
r"\bschulung(en)?\b",
|
||||
r"\bawareness\b",
|
||||
r"\bsensibilisier",
|
||||
# Vertraege intern
|
||||
r"\bauftragsverarbeitungsvertrag\b",
|
||||
r"\bAVV\b\s+abgeschlossen",
|
||||
r"\bvertrag.*abgeschlossen",
|
||||
r"\bdpa\s+(geschlossen|abgeschlossen|vorhanden)",
|
||||
r"\bSCC\s+(geschlossen|abgeschlossen|implementiert)",
|
||||
# Technisch-organisatorische Massnahmen (intern)
|
||||
r"\btechnisch[-\s]*organisatorische\s+ma(ß|ss)nahmen?\b",
|
||||
r"\bTOM\s+(umgesetzt|dokumentiert|implementiert)",
|
||||
r"\bverschluesselung\s+(implementiert|aktiv)",
|
||||
r"\bpseudonymisierung\s+(implementiert|aktiv)",
|
||||
r"\bbackup[s]?\s+(eingerichtet|vorhanden)",
|
||||
r"\bzugriffskontrolle",
|
||||
r"\b(rollen|berechtigungs)konzept",
|
||||
# Risikobewertung / DSFA (intern)
|
||||
r"\bdsfa\s+(durchgefuehrt|erstellt|dokumentiert)",
|
||||
r"\brisikobewertung\s+(durchgefuehrt|dokumentiert)",
|
||||
r"\brisikoanalyse",
|
||||
# Loeschkonzept / Aufbewahrung
|
||||
r"\bloeschkonzept\s+(umgesetzt|implementiert)",
|
||||
r"\baufbewahrungsfrist(en)?\s+(eingehalten|definiert)",
|
||||
r"\bloeschroutinen?\s+(aktiv|implementiert)",
|
||||
# Meldewege / Vorfallmanagement
|
||||
r"\bmeldepflicht\s+(eingehalten|umgesetzt)",
|
||||
r"\bvorfallmanagement",
|
||||
r"\bincident[\s-]?response",
|
||||
r"\b72[\s-]?stunden[\s-]?meldung",
|
||||
# Generische Prozess-Indikatoren
|
||||
r"\bdokumentiert\s+werden",
|
||||
r"\bbitte\s+(intern\s+)?dokumentieren",
|
||||
r"\bin\s+der\s+verfahrens",
|
||||
r"\bnach\s+innen\s+geh",
|
||||
r"\bausnahmen\s+(dokumentieren|protokollieren)",
|
||||
r"\bkostenfrei\s+(zur\s+verfuegung|gewaehren|ermoegli)",
|
||||
r"\bunentgeltlich\s+(zur\s+verfuegung)",
|
||||
# Vertragsleistung / Service-Level (intern)
|
||||
r"\bservice[\s-]?level",
|
||||
r"\breaktionszeit",
|
||||
# Auditierung / Aufsicht
|
||||
r"\binterne(s)?\s+audit",
|
||||
r"\baufsichtsbehoerde\s+gemeldet",
|
||||
r"\bbeauftragter\s+(intern|benannt)",
|
||||
# eCall + Branchen-spezifische interne Pflichten
|
||||
r"\babschaltung\s+der\s+\w+\s+kostenfrei",
|
||||
r"\bopt[\s-]?out\s+(intern|im\s+kundenportal)\s+ermoeglichen",
|
||||
]
|
||||
|
||||
# Patterns die auf interne Dokumentation hindeuten (VVT, DSFA-Datei, …)
|
||||
_DOC_INTERNAL_PATTERNS = [
|
||||
r"\bverzeichnis\s+der\s+verarbeitungstaetigkeiten\b",
|
||||
r"\bvvt(\s+|\b)",
|
||||
r"\bdsfa[\s-]?dokument",
|
||||
r"\bauftragsverarbeitungsverzeichnis",
|
||||
r"\bsub[\s-]?prozessor[\s-]?liste",
|
||||
r"\bverarbeitungs[\s-]?register",
|
||||
r"\binternes\s+register",
|
||||
r"\baufbewahrungs[\s-]?konzept\b",
|
||||
]
|
||||
|
||||
# Patterns die auf externe Sichtbarkeit hindeuten → DEFINITIV verifiable
|
||||
_VERIFIABLE_PATTERNS = [
|
||||
r"\bin\s+der\s+(datenschutzerklaerung|dse|cookie[\s-]?richtlinie|impressum|agb)\b",
|
||||
r"\bauf\s+der\s+website\s+(genannt|sichtbar|angegeben)",
|
||||
r"\bim\s+banner\s+(genannt|sichtbar)",
|
||||
r"\bim\s+cookie[\s-]?banner",
|
||||
r"\bauf\s+der\s+startseite",
|
||||
r"\bim\s+footer",
|
||||
]
|
||||
|
||||
|
||||
def _matches_any(text: str, patterns: list[str]) -> bool:
|
||||
tl = text.lower()
|
||||
for pat in patterns:
|
||||
try:
|
||||
if re.search(pat, tl):
|
||||
return True
|
||||
except re.error:
|
||||
continue
|
||||
return False
|
||||
|
||||
|
||||
def classify_mc_audit_type(
|
||||
title: str | None,
|
||||
check_question: str | None = None,
|
||||
fail_criteria: dict | None = None,
|
||||
) -> str:
|
||||
"""Returns 'verifiable', 'process_internal', 'doc_internal',
|
||||
or 'ambiguous'."""
|
||||
blob = " ".join([title or "", check_question or "",
|
||||
str(fail_criteria or "")])
|
||||
if not blob.strip():
|
||||
return "ambiguous"
|
||||
|
||||
is_verifiable_hint = _matches_any(blob, _VERIFIABLE_PATTERNS)
|
||||
is_process = _matches_any(blob, _PROCESS_INTERNAL_PATTERNS)
|
||||
is_doc = _matches_any(blob, _DOC_INTERNAL_PATTERNS)
|
||||
|
||||
# Wenn explicit Verifiable-Indikator + kein Process → verifiable
|
||||
if is_verifiable_hint and not (is_process or is_doc):
|
||||
return "verifiable"
|
||||
# Wenn Process oder Doc UND nicht Verifiable → intern
|
||||
if is_process and not is_verifiable_hint:
|
||||
return "process_internal"
|
||||
if is_doc and not is_verifiable_hint:
|
||||
return "doc_internal"
|
||||
# Beides → ambiguous, im Zweifel CHECK markieren
|
||||
if is_process or is_doc:
|
||||
return "ambiguous"
|
||||
return "verifiable"
|
||||
|
||||
|
||||
def annotate_mc_results(check_results: list[dict]) -> list[dict]:
|
||||
"""In-place: setzt mc_audit_type auf jeden MC-Check und ersetzt
|
||||
Status 'failed' durch 'check' wenn audit_type != verifiable."""
|
||||
if not check_results:
|
||||
return check_results
|
||||
n_reclassified = 0
|
||||
for r in check_results:
|
||||
if not isinstance(r, dict):
|
||||
continue
|
||||
if not (r.get("id") or "").startswith("mc-"):
|
||||
continue
|
||||
if "mc_audit_type" not in r:
|
||||
r["mc_audit_type"] = classify_mc_audit_type(
|
||||
r.get("label"), r.get("hint"), r.get("fail_criteria"),
|
||||
)
|
||||
# Wenn FAIL aber audit_type != verifiable → "check" (manuell)
|
||||
if (not r.get("passed")
|
||||
and not r.get("skipped")
|
||||
and r["mc_audit_type"] in (
|
||||
"process_internal", "doc_internal", "ambiguous",
|
||||
)):
|
||||
r["audit_status"] = "check" # NICHT failed
|
||||
n_reclassified += 1
|
||||
elif r.get("passed"):
|
||||
r["audit_status"] = "pass"
|
||||
elif r.get("skipped"):
|
||||
r["audit_status"] = "skip"
|
||||
else:
|
||||
r["audit_status"] = "fail"
|
||||
if n_reclassified:
|
||||
logger.info(
|
||||
"MC-Audit-Type: %d/%d MCs reklassifiziert von FAIL → CHECK "
|
||||
"(interne Pruefung erforderlich)",
|
||||
n_reclassified, len(check_results),
|
||||
)
|
||||
return check_results
|
||||
|
||||
|
||||
def split_by_audit_type(check_results: list[dict]) -> dict[str, list[dict]]:
|
||||
"""Liefert {verifiable_fails, internal_checks, passes, skips}."""
|
||||
out = {"verifiable_fails": [], "internal_checks": [],
|
||||
"passes": [], "skips": []}
|
||||
for r in (check_results or []):
|
||||
if not isinstance(r, dict):
|
||||
continue
|
||||
if not (r.get("id") or "").startswith("mc-"):
|
||||
continue
|
||||
status = r.get("audit_status")
|
||||
if status == "pass":
|
||||
out["passes"].append(r)
|
||||
elif status == "skip":
|
||||
out["skips"].append(r)
|
||||
elif status == "check":
|
||||
out["internal_checks"].append(r)
|
||||
elif status == "fail" or (not r.get("passed") and not r.get("skipped")):
|
||||
out["verifiable_fails"].append(r)
|
||||
return out
|
||||
|
||||
|
||||
def build_internal_checks_block_html(
|
||||
internal_checks: list[dict],
|
||||
limit: int = 30,
|
||||
) -> str:
|
||||
if not internal_checks:
|
||||
return ""
|
||||
by_type: dict[str, list[dict]] = {}
|
||||
for c in internal_checks:
|
||||
t = c.get("mc_audit_type", "ambiguous")
|
||||
by_type.setdefault(t, []).append(c)
|
||||
|
||||
sections: list[str] = []
|
||||
labels = {
|
||||
"process_internal": ("Interne Prozesse — bitte beim DSB pruefen",
|
||||
"#1e40af"),
|
||||
"doc_internal": ("Interne Dokumentation — bitte im VVT/DSFA pruefen",
|
||||
"#5b21b6"),
|
||||
"ambiguous": ("Unklar ob Audit-Befund oder interne Pruefung",
|
||||
"#92400e"),
|
||||
}
|
||||
for atype, (heading, color) in labels.items():
|
||||
items = by_type.get(atype) or []
|
||||
if not items:
|
||||
continue
|
||||
rows = "".join(
|
||||
f'<li style="margin-bottom:4px;font-size:11px;line-height:1.45">'
|
||||
f'<strong>{(c.get("label") or "")[:160]}</strong>'
|
||||
+ (f' <span style="color:#94a3b8">({c.get("regulation") or "—"})</span>'
|
||||
if c.get("regulation") else '') +
|
||||
f'</li>'
|
||||
for c in items[:limit]
|
||||
)
|
||||
sections.append(
|
||||
f'<div style="margin-bottom:10px">'
|
||||
f'<div style="font-size:11px;color:{color};text-transform:uppercase;'
|
||||
f'letter-spacing:1px;font-weight:600;margin-bottom:4px">'
|
||||
f'{heading} ({len(items)})</div>'
|
||||
f'<ul style="margin:0 0 0 18px;padding:0">{rows}</ul>'
|
||||
f'</div>'
|
||||
)
|
||||
return (
|
||||
'<div style="font-family:-apple-system,BlinkMacSystemFont,sans-serif;'
|
||||
'max-width:760px;margin:0 auto 16px;padding:12px 16px;'
|
||||
'background:#f0f9ff;border:1px solid #bfdbfe;border-radius:8px">'
|
||||
'<div style="font-size:11px;color:#1e40af;text-transform:uppercase;'
|
||||
'letter-spacing:1.2px;margin-bottom:4px;font-weight:600">'
|
||||
'Pruefungen die wir von aussen NICHT durchfuehren koennen</div>'
|
||||
f'<h3 style="margin:0 0 6px;font-size:14px;color:#1e293b">'
|
||||
f'{len(internal_checks)} Pruefpunkt'
|
||||
f'{"e" if len(internal_checks) != 1 else ""} sind '
|
||||
'NUR intern beim Kunden zu pruefen</h3>'
|
||||
'<p style="margin:0 0 10px;font-size:11px;color:#475569;'
|
||||
'line-height:1.5">'
|
||||
'Diese Anforderungen koennen wir per externem Website-Audit nicht '
|
||||
'als erfuellt oder nicht-erfuellt bewerten — sie betreffen interne '
|
||||
'Prozesse (Schulungen, AVV-Vertraege, TOM-Doku) oder interne '
|
||||
'Dokumentation (VVT, DSFA, Loeschkonzept). Sie sind also <strong>kein '
|
||||
'Verstoss</strong>, sondern Hinweis-Checks fuer Ihren DSB.</p>'
|
||||
+ "".join(sections) +
|
||||
'</div>'
|
||||
)
|
||||
@@ -61,6 +61,12 @@ def build_scorecard(check_results: list[dict]) -> dict:
|
||||
b["skipped"] += 1
|
||||
elif r.get("passed"):
|
||||
b["passed"] += 1
|
||||
# P106 — interner Check ist KEIN Fail (zaehlt als skipped fuer
|
||||
# die Score-Berechnung damit der Score realistisch ist).
|
||||
elif r.get("audit_status") == "check":
|
||||
b["skipped"] += 1
|
||||
b.setdefault("internal_checks", 0)
|
||||
b["internal_checks"] += 1
|
||||
else:
|
||||
b["failed"] += 1
|
||||
sev = (r.get("severity") or "MEDIUM").upper()
|
||||
|
||||
@@ -0,0 +1,173 @@
|
||||
"""
|
||||
P68 — Reverse-Audit: eigene Templates gegen alle MCs pruefen.
|
||||
|
||||
Statt 'gegeben einen Kunden-Text → welche MCs fail' machen wir den
|
||||
umgekehrten Test: 'gegeben unseren BreakPilot-Standard-Template-Pool
|
||||
(95 Templates) → welche MCs werden NICHT abgedeckt? Wo sind Luecken?'
|
||||
|
||||
Liefert einen Coverage-Report:
|
||||
- Total MCs in DB: ~1800
|
||||
- MCs abgedeckt durch min. 1 unserer Templates: X
|
||||
- MCs ohne Coverage: Y (Liste)
|
||||
- Templates ohne MC-Wirkung: Z (Liste)
|
||||
|
||||
Zweck: Audit unserer eigenen Code-Base. Wenn ein Customer einen Lauf
|
||||
macht und 50 Findings produziert sind, sollten 90%+ davon durch unsere
|
||||
Template-Bibliothek korrigierbar sein. Wenn nicht → Templates fehlen.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
|
||||
from sqlalchemy import text as sa_text
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def run_reverse_audit(db: Session) -> dict:
|
||||
"""Hauptfunktion. Returns coverage-report dict."""
|
||||
# 1) Alle MCs aus doc_check_controls laden
|
||||
mc_rows = db.execute(sa_text(
|
||||
"""
|
||||
SELECT id::text, control_id, doc_type, title, check_question,
|
||||
pass_criteria, severity
|
||||
FROM compliance.doc_check_controls
|
||||
ORDER BY doc_type, severity DESC
|
||||
"""
|
||||
)).fetchall()
|
||||
|
||||
# 2) Templates aus DB (doc_templates oder legal_templates oder analog)
|
||||
try:
|
||||
tpl_rows = db.execute(sa_text(
|
||||
"""
|
||||
SELECT id::text, doc_type, title, body
|
||||
FROM compliance.doc_templates
|
||||
WHERE active = TRUE
|
||||
"""
|
||||
)).fetchall()
|
||||
except Exception:
|
||||
# Fallback auf evtl. andere Template-Tabelle
|
||||
try:
|
||||
tpl_rows = db.execute(sa_text(
|
||||
"""
|
||||
SELECT id::text, doc_type, name AS title, content AS body
|
||||
FROM compliance.legal_templates
|
||||
"""
|
||||
)).fetchall()
|
||||
except Exception as e:
|
||||
logger.warning("template table not found: %s", e)
|
||||
tpl_rows = []
|
||||
|
||||
# 3) Coverage-Matrix: pro MC, ob ein Template sie abdeckt
|
||||
templates_by_doctype: dict[str, list[dict]] = {}
|
||||
for tid, dt, title, body in tpl_rows:
|
||||
templates_by_doctype.setdefault(dt or "other", []).append({
|
||||
"id": tid, "title": title, "body": (body or "")[:50000],
|
||||
})
|
||||
|
||||
covered_mc_ids: set[str] = set()
|
||||
uncovered: list[dict] = []
|
||||
for mc_id, ctrl_id, dt, title, q, pc, sev in mc_rows:
|
||||
tpls = templates_by_doctype.get(dt or "other") or []
|
||||
if not tpls:
|
||||
uncovered.append({
|
||||
"mc_id": ctrl_id, "doc_type": dt, "title": title,
|
||||
"severity": sev, "reason": "no_template_for_doctype",
|
||||
})
|
||||
continue
|
||||
# Heuristik: pass_criteria sind Pattern. Wenn IRGENDEIN Template
|
||||
# die Pattern enthaelt → covered.
|
||||
criteria = _extract_patterns_from_pc(pc)
|
||||
if not criteria:
|
||||
# ohne klare Pattern: per Title-Keywords pruefen
|
||||
criteria = _title_keywords(title or "")
|
||||
ok = False
|
||||
for tpl in tpls:
|
||||
body = tpl["body"].lower()
|
||||
hits = sum(1 for p in criteria if p and p.lower() in body)
|
||||
if hits >= max(1, len(criteria) // 2):
|
||||
ok = True
|
||||
break
|
||||
if ok:
|
||||
covered_mc_ids.add(mc_id)
|
||||
else:
|
||||
uncovered.append({
|
||||
"mc_id": ctrl_id, "doc_type": dt, "title": title,
|
||||
"severity": sev, "reason": "no_template_match",
|
||||
"criteria_sample": criteria[:5],
|
||||
})
|
||||
|
||||
# 4) Templates ohne MC-Wirkung
|
||||
used_template_ids: set[str] = set()
|
||||
for mc_id, ctrl_id, dt, title, q, pc, sev in mc_rows:
|
||||
if mc_id not in covered_mc_ids:
|
||||
continue
|
||||
tpls = templates_by_doctype.get(dt or "other") or []
|
||||
criteria = _extract_patterns_from_pc(pc) or _title_keywords(title or "")
|
||||
for tpl in tpls:
|
||||
body = tpl["body"].lower()
|
||||
hits = sum(1 for p in criteria if p and p.lower() in body)
|
||||
if hits >= max(1, len(criteria) // 2):
|
||||
used_template_ids.add(tpl["id"])
|
||||
break
|
||||
all_template_ids = {t["id"] for tpls in templates_by_doctype.values()
|
||||
for t in tpls}
|
||||
unused_templates = all_template_ids - used_template_ids
|
||||
|
||||
return {
|
||||
"total_mcs": len(mc_rows),
|
||||
"total_templates": len(all_template_ids),
|
||||
"covered_mcs": len(covered_mc_ids),
|
||||
"uncovered_mcs": len(uncovered),
|
||||
"coverage_pct": round(len(covered_mc_ids) / max(1, len(mc_rows)) * 100, 1),
|
||||
"unused_templates": sorted(unused_templates),
|
||||
"top_uncovered_high": [u for u in uncovered if u.get("severity") == "HIGH"][:30],
|
||||
"by_doctype": _summarize_by_doctype(mc_rows, covered_mc_ids),
|
||||
}
|
||||
|
||||
|
||||
def _extract_patterns_from_pc(pc) -> list[str]:
|
||||
"""pc ist jsonb mit z.B. {required_phrases: [...]}, {keywords: [...]}"""
|
||||
if not pc:
|
||||
return []
|
||||
if isinstance(pc, str):
|
||||
try:
|
||||
import json as _j
|
||||
pc = _j.loads(pc)
|
||||
except Exception:
|
||||
return [pc[:50]]
|
||||
if isinstance(pc, dict):
|
||||
out: list[str] = []
|
||||
for k in ("required_phrases", "keywords", "must_contain",
|
||||
"patterns", "phrases"):
|
||||
v = pc.get(k)
|
||||
if isinstance(v, list):
|
||||
out.extend([str(x)[:80] for x in v if x])
|
||||
return out
|
||||
if isinstance(pc, list):
|
||||
return [str(x)[:80] for x in pc if x]
|
||||
return []
|
||||
|
||||
|
||||
def _title_keywords(title: str) -> list[str]:
|
||||
"""Fallback wenn pass_criteria leer: extrahiere Substantive aus Title."""
|
||||
if not title:
|
||||
return []
|
||||
# primitive: alle Worte > 4 Buchstaben
|
||||
return [w for w in re.findall(r"\b\w{5,}\b", title)][:5]
|
||||
|
||||
|
||||
def _summarize_by_doctype(mc_rows, covered_mc_ids: set[str]) -> dict:
|
||||
out: dict[str, dict] = {}
|
||||
for mc_id, ctrl_id, dt, title, q, pc, sev in mc_rows:
|
||||
dt = dt or "other"
|
||||
d = out.setdefault(dt, {"total": 0, "covered": 0})
|
||||
d["total"] += 1
|
||||
if mc_id in covered_mc_ids:
|
||||
d["covered"] += 1
|
||||
for dt, d in out.items():
|
||||
d["pct"] = round(d["covered"] / max(1, d["total"]) * 100, 1)
|
||||
return out
|
||||
@@ -28,4 +28,8 @@ USER appuser
|
||||
|
||||
EXPOSE 8094
|
||||
|
||||
# P83 — Build-SHA fuer check-rebuild-needed.sh
|
||||
ARG BUILD_SHA="unknown"
|
||||
ENV BUILD_SHA=${BUILD_SHA}
|
||||
|
||||
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8094"]
|
||||
|
||||
Reference in New Issue
Block a user