Compare commits

..

5 Commits

Author SHA1 Message Date
Benjamin Admin 299375e486 feat(dsms): version chain history + diff endpoint + Audit Timeline UI
DSMS Stufe 3 — making the parent_cid chain useful end-to-end.

Gateway (dsms-gateway):
- /api/v1/documents/{cid}/history alias added next to the legacy
  /documents/{cid}/history (history endpoint itself was already there,
  just under an inconsistent prefix).
- NEW /api/v1/documents/{cid_a}/diff/{cid_b}: fetches both packages from
  IPFS, computes a metadata diff (per-field old/new), and renders a
  unified text diff for utf-8 payloads. Binary payloads return only
  metadata diff with a "binary — compare via rendered export" note.
- 4 new pytest cases (mocking ipfs_cat): text diff, binary fallback,
  fetch error, history chain depth — all green.

Frontend (admin-compliance):
- CIDHistoryModal: lazy-loads /dsms/documents/:cid/history, renders the
  version chain as a vertical timeline, marks the AKTUELL entry, and
  per-step exposes a "Diff zu V<n>" button that loads + renders the diff
  inline (metadata table + unified text diff in a monospace panel).
- AuditTimelinePage: existing CID badge now sits next to a "Verlauf
  anzeigen" link that opens the modal. Handles both Python's plain-CID
  audit values and the Go techfile flow's JSON envelope {cid, filename,
  size} via extractCID() helper.

This makes "show me how this CE-Akte changed between V2 and V3"
self-service in the UI instead of a curl-against-IPFS workflow.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 10:10:07 +02:00
Benjamin Admin e2be51b0aa feat(audit): P106 MC-Audit-Type + P83 BUILD_SHA in Dockerfiles + P80 v2 full
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 16s
CI / detect-changes (push) Successful in 11s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / loc-budget (push) Failing after 16s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 2m42s
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 41s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
P106 — mc_audit_type.py: zentrales Quality-Thema.
Klassifiziert pro MC: verifiable / process_internal / doc_internal /
ambiguous. Pattern-Match auf check_question + title + fail_criteria
(Schulung, AVV abgeschlossen, TOM umgesetzt, DSFA durchgefuehrt,
Ausnahmen dokumentieren, kostenfrei zur Verfuegung, opt-out
intern ermoeglichen, …).

Interne MCs werden in der MC-Auswertung NICHT mehr als FAIL gewertet,
sondern als CHECK markiert (audit_status='check'). Sie zaehlen im
build_scorecard als skipped (nicht failed) damit der Score realistisch
ist. build_internal_checks_block_html() rendert sie als separaten
blauen Block 'Pruefungen die wir von aussen NICHT durchfuehren koennen'
nach dem MC-Scorecard.

Erwartete Wirkung: bei VW 95 FAILs → wahrscheinlich 30-40 echte
verifiable_fails + 50-60 internal_checks. GF-Mail wird drastisch
realistischer (statt 'Sie haben 95 Verstoesse' → 'Sie haben 35
extern sichtbare Themen + 60 interne Checks, bitte mit DSB klaeren').

P83 — BUILD_SHA in backend/admin/consent-tester Dockerfiles als
ARG + ENV. check-rebuild-needed.sh kann jetzt deployed vs local SHA
vergleichen + REBUILD REQUIRED melden.

P80 v2 — check_replay.py macht jetzt vollstaendigen Replay aller
post-fetch Quality-Generatoren: vendor_normalizer (Dedup),
audit_quality_checks, cookie_compliance_audit, tcf_vendor_authority,
cookie_value_entropy, cookie_network_tracer. Snapshots aus alter Zeit
zeigen jetzt im Replay den aktuellen Audit-Stand.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 08:57:02 +02:00
Benjamin Admin bd65b6f318 feat(audit): Phase 2+3 — P54 + P68 + P69 + P6/P53/P55 + P31 + P80v2
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Failing after 59s
CI / detect-changes (push) Successful in 10s
CI / branch-name (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 15s
CI / loc-budget (push) Failing after 19s
CI / iace-gt-coverage (push) Successful in 27s
CI / test-python-backend (push) Successful in 42s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
P54 — consent_diff_for_user.py: USP-Feature fuer wiederkehrende Besucher.
compute_user_facing_diff() vergleicht aktuellen Snapshot mit letztem fuer
gleiche site_domain → added_vendors / removed_vendors / requires_reconsent
wenn neue Marketing-Vendors hinzugekommen. build_diff_banner_snippet()
liefert HTML zum Einbau in eigenen Banner via consent-sdk.

P68 — reverse_audit.py: Self-Audit unserer Template-Bibliothek.
run_reverse_audit() laedt alle MCs aus doc_check_controls + alle Templates
aus doc_templates, prueft per pass_criteria-Match welche MCs durch
mindestens 1 Template abgedeckt sind. Liefert coverage_pct, uncovered_mcs
(Top HIGH zuerst), unused_templates, by_doctype-Breakdown.

P69 — data/ecall_regulation.json: eCall-VO (EU) 2015/758 als 7 Chunks
fuer RAG-Ingest (Art. 3/6/7 + compliance_implications fuer Automotive-OEMs).
Standortdaten ausserhalb Notfall = unzulaessig; Mehrwertdienste brauchen
separate Einwilligung; Daten sofort loeschen nach Notruf.

P6+P53+P55 — industry_library.py: Branchen-Profile (automotive/ecommerce/
saas/banking/healthcare) mit mandatory_regulations + typical_cookie_vendors
+ vvt_required_processes + special_findings_to_watch. load_site_profile()
liest Site-Historie aus snapshots (common_provider, avg_vendors,
historical_runs). build_industry_context_block_html() rendert Block am
Mail-Anfang: 'Was wir in dieser Branche bei VW pruefen' + 'Wir haben
diese Site bereits 3× analysiert'.

P31 — llm_cascade.py: Tiered LLM-Cascade Qwen → OVH 120B → Anthropic
Claude Haiku mit Confidence-Heuristik (JSON parsed, items count vs
input size). Valkey-Cache (redis://) mit 7-Tage-TTL plus In-Process-
Fallback. Wenn Tier-1 unter Confidence-Threshold → Tier-2, dann Tier-3.
Reduziert Lauf-Zeit drastisch bei Re-Runs.

P80 v2 — check_replay.py: replay nutzt jetzt audit_quality_checks
mit den Snapshot-Daten. Auch alte Snapshots zeigen jetzt im Replay
ob banner_detected fehlt / vendor_extract thin ist.

Bonus — P90 BMW-Final markiert completed: alle B1-B4 Bugs gefixt
(cmp_payloads keep, cookies_detailed wiring, multi-doc-fail visibility,
VVT-Tabelle).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 08:38:08 +02:00
Benjamin Admin c771d8ecb9 Merge feat/iace-lift-endstop-bridge: OSHA→engine bridge + drift filter
CI / guardrail-integrity (push) Has been skipped
CI / detect-changes (push) Successful in 11s
CI / branch-name (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 17s
CI / loc-budget (push) Failing after 19s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Failing after 1m9s
CI / iace-gt-coverage (push) Successful in 29s
CI / test-python-backend (push) Has been skipped
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
2026-05-22 08:37:34 +02:00
Benjamin Admin 772ff35e8d feat(iace): bridge OSHA MD library to pattern engine, body-part-specific lift crush hazards
- M600-M604: lift endstop mitigations (Kriechgeschwindigkeit, Schaltleiste,
  Mindestabstand, Hold-to-run, Trittblech) — cite OSHA + EN ISO identifiers
- HP2100-HP2102: body-part crush patterns for lift family (foot under platform,
  hand/body against fixed structure, leg between lift and lateral structure),
  restricted via MachineTypes filter
- pattern_machinetype_overrides.go: post-load pass fills MachineTypes on 14
  legacy patterns (HP1000 Walzen, HP539 Schweiss, HP545/HP782 Glas,
  HP756/HP757/HP760 Fahrtreppe, HP1400-1402 CNC, HP045/HP049 Pressen,
  HP420-422 Conveyor) to prevent drift on Kistenhubgeraet-style projects

Why: Kistenhubgeraet re-init exposed two gaps — the abstract "Bremse versagt
bei Absenkbewegung" pattern fired but the concrete foot-crush body-part variant
was missing, AND ~10 unrelated patterns fired purely because their RequiredTags
incidentally aligned. Override map avoids touching 1000+ LOC pattern files
that already exceed the soft cap.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 08:37:24 +02:00
21 changed files with 1952 additions and 5 deletions
+4
View File
@@ -55,5 +55,9 @@ EXPOSE 3000
# Set hostname
ENV HOSTNAME="0.0.0.0"
# P83 — Build-SHA fuer check-rebuild-needed.sh
ARG BUILD_SHA="unknown"
ENV BUILD_SHA=${BUILD_SHA}
# Start the application
CMD ["node", "server.js"]
@@ -0,0 +1,207 @@
'use client'
import { useEffect, useState } from 'react'
interface HistoryEntry {
cid: string
version: string | null
document_type: string | null
document_id: string | null
parent_cid: string | null
created_at: string | null
checksum: string | null
}
interface DiffResponse {
kind: 'text' | 'binary'
cid_a: string
cid_b: string
metadata_diff: Record<string, { old: unknown; new: unknown }>
diff?: string
added_lines?: number
removed_lines?: number
note?: string
}
interface Props {
cid: string
onClose: () => void
}
function shorten(cid: string): string {
if (cid.length <= 14) return cid
return cid.slice(0, 8) + '…' + cid.slice(-6)
}
export default function CIDHistoryModal({ cid, onClose }: Props) {
const [history, setHistory] = useState<HistoryEntry[]>([])
const [loading, setLoading] = useState(true)
const [error, setError] = useState<string | null>(null)
const [diffPair, setDiffPair] = useState<{ a: string; b: string } | null>(null)
const [diff, setDiff] = useState<DiffResponse | null>(null)
const [diffLoading, setDiffLoading] = useState(false)
useEffect(() => {
let cancel = false
setLoading(true)
setError(null)
fetch(`/api/sdk/v1/dsms/documents/${encodeURIComponent(cid)}/history`)
.then(async (r) => {
if (!r.ok) throw new Error(`HTTP ${r.status}`)
const json = await r.json()
if (!cancel) setHistory(json.history || [])
})
.catch((e) => {
if (!cancel) setError(e?.message || 'Fehler beim Laden')
})
.finally(() => {
if (!cancel) setLoading(false)
})
return () => {
cancel = true
}
}, [cid])
async function loadDiff(a: string, b: string) {
setDiffPair({ a, b })
setDiff(null)
setDiffLoading(true)
try {
const res = await fetch(
`/api/sdk/v1/dsms/documents/${encodeURIComponent(a)}/diff/${encodeURIComponent(b)}`
)
if (res.ok) {
const json = (await res.json()) as DiffResponse
setDiff(json)
} else {
setDiff({ kind: 'binary', cid_a: a, cid_b: b, metadata_diff: {}, note: `HTTP ${res.status}` })
}
} finally {
setDiffLoading(false)
}
}
return (
<div className="fixed inset-0 z-50 flex items-center justify-center bg-black/40 p-4" onClick={onClose}>
<div
className="w-full max-w-3xl max-h-[90vh] overflow-hidden flex flex-col bg-white dark:bg-gray-800 rounded-xl shadow-xl"
onClick={(e) => e.stopPropagation()}
>
<div className="flex items-center justify-between px-5 py-3 border-b border-gray-200 dark:border-gray-700">
<div>
<h2 className="text-sm font-semibold text-gray-900 dark:text-white">DSMS-Versionsverlauf</h2>
<code className="text-[10px] font-mono text-gray-500 dark:text-gray-400">{shorten(cid)}</code>
</div>
<button onClick={onClose} className="text-gray-500 hover:text-gray-700 dark:text-gray-400 text-sm">
Schliessen
</button>
</div>
<div className="flex-1 overflow-y-auto p-5 space-y-4">
{loading && <div className="text-sm text-gray-500">Verlauf wird geladen</div>}
{error && <div className="text-sm text-red-600 dark:text-red-400">{error}</div>}
{!loading && !error && history.length === 0 && (
<div className="text-sm text-gray-500 italic">
Kein Versionsverlauf gefunden. Diese CID hat keine parent_cid-Kette.
</div>
)}
{!loading && !error && history.length > 0 && (
<>
<div className="text-xs text-gray-500 dark:text-gray-400">
{history.length} Version{history.length > 1 ? 'en' : ''} in der Kette (neueste oben).
</div>
<ol className="relative border-l-2 border-emerald-500/40 pl-4 space-y-3">
{history.map((entry, idx) => {
const next = history[idx + 1]
return (
<li key={entry.cid} className="relative">
<div className="absolute -left-[1.4rem] top-1.5 w-3 h-3 rounded-full bg-emerald-500 ring-2 ring-white dark:ring-gray-800" />
<div className="bg-gray-50 dark:bg-gray-900/40 rounded-lg p-3 border border-gray-200 dark:border-gray-700">
<div className="flex items-center justify-between gap-2">
<div className="min-w-0">
<div className="text-sm font-medium text-gray-900 dark:text-white">
Version {entry.version || '?'} {idx === 0 && <span className="ml-2 text-[10px] text-emerald-600 font-semibold">AKTUELL</span>}
</div>
<code className="text-[10px] font-mono text-gray-500 dark:text-gray-400 break-all">{entry.cid}</code>
</div>
{next && (
<button
onClick={() => loadDiff(next.cid, entry.cid)}
className="shrink-0 text-[11px] text-purple-600 hover:text-purple-800 dark:text-purple-400 hover:underline"
title="Aenderungen zur Vorversion anzeigen"
>
Diff zu V{next.version || '?'}
</button>
)}
</div>
<div className="mt-1 text-[11px] text-gray-500 dark:text-gray-400 flex flex-wrap gap-x-3 gap-y-0.5">
{entry.document_type && <span>Typ: {entry.document_type}</span>}
{entry.document_id && <span>Dok-ID: {entry.document_id}</span>}
{entry.created_at && <span>{new Date(entry.created_at).toLocaleString('de-DE')}</span>}
</div>
{entry.checksum && (
<div className="mt-1 text-[10px] text-gray-400 font-mono">SHA-256: {entry.checksum.slice(0, 16)}</div>
)}
</div>
</li>
)
})}
</ol>
</>
)}
{diffPair && (
<div className="mt-4 border-t border-gray-200 dark:border-gray-700 pt-4 space-y-2">
<div className="flex items-center justify-between">
<h3 className="text-xs font-semibold text-gray-900 dark:text-white">
Diff: {shorten(diffPair.a)} {shorten(diffPair.b)}
</h3>
<button onClick={() => { setDiff(null); setDiffPair(null) }} className="text-[11px] text-gray-500 hover:text-gray-700">
Schliessen
</button>
</div>
{diffLoading && <div className="text-xs text-gray-500">Diff wird geladen</div>}
{!diffLoading && diff && (
<>
{Object.keys(diff.metadata_diff || {}).length > 0 && (
<div className="text-xs">
<div className="font-medium text-gray-700 dark:text-gray-300 mb-1">Metadaten-Aenderungen</div>
<table className="w-full">
<tbody>
{Object.entries(diff.metadata_diff).map(([field, { old, new: nv }]) => (
<tr key={field} className="border-b border-gray-100 dark:border-gray-800">
<td className="py-0.5 pr-2 font-mono text-[10px] text-gray-500">{field}</td>
<td className="py-0.5 pr-2 text-red-600 dark:text-red-400 line-through">{JSON.stringify(old)}</td>
<td className="py-0.5 text-green-700 dark:text-green-400">{JSON.stringify(nv)}</td>
</tr>
))}
</tbody>
</table>
</div>
)}
{diff.kind === 'text' && diff.diff && (
<>
<div className="text-[11px] text-gray-500">
{diff.added_lines ?? 0} Zeilen hinzu, {diff.removed_lines ?? 0} entfernt
</div>
<pre className="text-[10px] font-mono whitespace-pre-wrap bg-gray-900 text-gray-100 p-3 rounded max-h-64 overflow-y-auto">
{diff.diff}
</pre>
</>
)}
{diff.kind === 'binary' && (
<div className="text-xs text-amber-700 dark:text-amber-400 italic">
{diff.note || 'Binaere Datei — kein Text-Diff verfuegbar.'}
</div>
)}
</>
)}
</div>
)}
</div>
</div>
</div>
)
}
@@ -1,6 +1,8 @@
'use client'
import { useState } from 'react'
import { useAuditTimeline, type AuditEntry } from './_hooks/useAuditTimeline'
import CIDHistoryModal from './_components/CIDHistoryModal'
const ENTITY_LABELS: Record<string, string> = {
evidence: 'Nachweis', control: 'Control', document: 'Dokument',
@@ -16,8 +18,24 @@ const ACTION_COLORS: Record<string, string> = {
const FILTER_OPTIONS = ['all', 'evidence', 'dsms_archive', 'control', 'document', 'dsfa', 'vvt', 'tom']
// new_value may be a plain CID (from Python evidence flow) or a JSON envelope
// {"cid":"X","filename":"...","size":"..."} (from the Go IACE tech-file flow).
function extractCID(value: string): string {
const trimmed = value.trim()
if (trimmed.startsWith('{')) {
try {
const parsed = JSON.parse(trimmed)
if (typeof parsed.cid === 'string') return parsed.cid
} catch {
// fall through
}
}
return trimmed
}
export default function AuditTimelinePage() {
const { entries, loading, filter, setFilter } = useAuditTimeline()
const [historyCID, setHistoryCID] = useState<string | null>(null)
return (
<div className="max-w-4xl mx-auto space-y-6">
@@ -58,16 +76,18 @@ export default function AuditTimelinePage() {
<div className="space-y-4">
{entries.map((entry) => (
<TimelineEntry key={entry.id} entry={entry} />
<TimelineEntry key={entry.id} entry={entry} onShowHistory={setHistoryCID} />
))}
</div>
</div>
)}
{historyCID && <CIDHistoryModal cid={historyCID} onClose={() => setHistoryCID(null)} />}
</div>
)
}
function TimelineEntry({ entry }: { entry: AuditEntry }) {
function TimelineEntry({ entry, onShowHistory }: { entry: AuditEntry; onShowHistory: (cid: string) => void }) {
const dotColor = ACTION_COLORS[entry.action] || 'bg-gray-400'
const isCID = entry.field_changed === 'dsms_cid' || entry.action === 'archive'
const date = new Date(entry.performed_at)
@@ -94,7 +114,7 @@ function TimelineEntry({ entry }: { entry: AuditEntry }) {
<p className="text-xs text-gray-600 dark:text-gray-400 mt-1">{entry.change_summary}</p>
)}
{isCID && entry.new_value && (
<div className="mt-2 flex items-center gap-2">
<div className="mt-2 flex items-center gap-2 flex-wrap">
<svg className="w-3.5 h-3.5 text-emerald-600 flex-shrink-0" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M9 12l2 2 4-4m5.618-4.016A11.955 11.955 0 0112 2.944a11.955 11.955 0 01-8.618 3.04A12.02 12.02 0 003 9c0 5.591 3.824 10.29 9 11.622 5.176-1.332 9-6.03 9-11.622 0-1.042-.133-2.052-.382-3.016z" />
</svg>
@@ -102,6 +122,16 @@ function TimelineEntry({ entry }: { entry: AuditEntry }) {
{entry.new_value.length > 20 ? entry.new_value.slice(0, 8) + '...' + entry.new_value.slice(-6) : entry.new_value}
</code>
<span className="text-[10px] text-emerald-500">DSMS/IPFS</span>
<button
onClick={(e) => {
e.stopPropagation()
if (entry.new_value) onShowHistory(extractCID(entry.new_value))
}}
className="text-[10px] text-purple-600 hover:text-purple-800 dark:text-purple-400 underline-offset-2 hover:underline"
title="DSMS-Versionsverlauf und Diff zur Vorversion anzeigen"
>
Verlauf anzeigen
</button>
</div>
)}
</div>
@@ -0,0 +1,96 @@
package iace
// Body-part-specific crush hazards at lift / hoist / scissor-lift endstops.
// Bridges the gap that the Kistenhubgeraet re-init exposed: the abstract
// "Bremse versagt bei Absenkbewegung" pattern fires, but the concrete
// "Fuss unter absenkender Hubplattform" body-part variant did not exist.
//
// Each pattern restricts to lift-family machine types via MachineTypes,
// so a press / CNC / textile project does not pick them up. Mitigations
// reference the new M600-M604 (lift endstop) library plus the existing
// M001 (geometry), M002 (safety distance), M141 (warning sign).
func GetLiftEndstopPatterns() []HazardPattern {
liftTypes := []string{"lift", "hoist", "elevator", "scissor_lift"}
return []HazardPattern{
{
ID: "HP2100",
NameDE: "Fuss-Quetschung unter absenkender Hubplattform am Bodenanschlag",
NameEN: "Foot crush under descending lift platform at floor stop",
RequiredComponentTags: []string{"crush_point", "gravity_risk", "person_under_load"},
RequiredEnergyTags: []string{"gravitational"},
MachineTypes: liftTypes,
GeneratedHazardCats: []string{"mechanical_hazard"},
SuggestedMeasureIDs: []string{"M600", "M601", "M604", "M141"},
Priority: 92,
ScenarioDE: "Fuss oder Bein des Bedieners gelangt waehrend des Absenkvorgangs unter die " +
"Hubplattform. Bei Erreichen der unteren Endlage wird der Fuss zwischen Plattform " +
"und Boden gequetscht.",
TriggerDE: "Unsachgemaesse Position des Bedieners beim Be-/Entladen, fehlende Schaltleiste, fehlender Trittschutz",
HarmDE: "Fussquetschung, Mittelfussfraktur, Zehenamputation",
AffectedDE: "Bediener, Wartungspersonal",
ZoneDE: "Bodenbereich unter Hubplattform, umlaufende Spalte",
DefaultSeverity: 4,
DefaultExposure: 3,
DefaultAvoidability: 2,
ISO12100Section: "6.3.5.5 Quetschen — Mindestabstaende",
ClarificationQuestionsDE: []string{
"Ist eine umlaufende Quetsch-Schaltleiste an der Plattformunterkante verbaut?",
"Ist die Hubgeschwindigkeit am unteren Endanschlag auf <=15 mm/s reduziert (siehe M600)?",
"Verhindert ein Trittblech / Unterfahrschutz das Hineinfahren von Fuessen?",
},
},
{
ID: "HP2101",
NameDE: "Hand- oder Koerper-Quetschung gegen feste Struktur beim Hochfahren der Hubeinheit",
NameEN: "Hand or body crush against fixed structure during lift upward travel",
RequiredComponentTags: []string{"crush_point", "gravity_risk"},
RequiredEnergyTags: []string{"gravitational"},
MachineTypes: liftTypes,
GeneratedHazardCats: []string{"mechanical_hazard"},
SuggestedMeasureIDs: []string{"M602", "M603", "M600", "M141"},
Priority: 90,
ScenarioDE: "Beim Hochfahren der Last gelangen Hand oder Koerperteile des Bedieners " +
"zwischen die hoechste Position der Hubeinheit (z.B. mit beladener Palette) und " +
"eine feste Struktur oberhalb (Decke, Vorbau, Querbalken einer umschliessenden Anlage).",
TriggerDE: "Eingriff in den Verfahrweg waehrend Hubvorgang, fehlende konstruktive Begrenzung der Endlage",
HarmDE: "Hand- oder Armquetschung, im Extremfall Brustkorbkompression",
AffectedDE: "Bediener, Einrichter, Wartungspersonal",
ZoneDE: "Oberhalb hoechster Hubposition, Vorbau/Decke der umschliessenden Anlage",
DefaultSeverity: 4,
DefaultExposure: 2,
DefaultAvoidability: 2,
ISO12100Section: "6.3.5.5 Quetschen — Mindestabstaende",
ClarificationQuestionsDE: []string{
"Welcher Mindestabstand zu festen Strukturen oberhalb der hoechsten Hubposition ist gegeben? (Empfehlung: 120 mm fuer Kopf, 100 mm fuer Hand)",
"Ist der Tippbetrieb (Hold-to-run) durch ein Testprotokoll mit Stop-Zeit-Messung verifiziert?",
"Existiert eine redundante Hardware-Endlage zusaetzlich zur Software-Begrenzung?",
},
},
{
ID: "HP2102",
NameDE: "Quetschung Bein/Koerper zwischen Hubeinheit und seitlicher Struktur",
NameEN: "Leg/body crush between lift unit and lateral structure",
RequiredComponentTags: []string{"crush_point", "gravity_risk", "moving_part"},
RequiredEnergyTags: []string{"gravitational"},
MachineTypes: liftTypes,
GeneratedHazardCats: []string{"mechanical_hazard"},
SuggestedMeasureIDs: []string{"M602", "M601", "M141"},
Priority: 85,
ScenarioDE: "Person befindet sich seitlich neben der Hubeinheit und wird waehrend " +
"der Bewegung gegen eine feste Struktur (Regalwand, Stuetze, andere Anlage) gequetscht.",
TriggerDE: "Aufenthalt in Quetschzone bei Bewegung, fehlende Absperrung",
HarmDE: "Beinfraktur, Beckenquetschung",
AffectedDE: "Bediener, vorbeigehende Personen",
ZoneDE: "Seitlicher Bereich neben Hubeinheit, Lichte Weite zu festen Strukturen",
DefaultSeverity: 4,
DefaultExposure: 2,
DefaultAvoidability: 2,
ISO12100Section: "6.3.5.5 Quetschen — Mindestabstaende",
ClarificationQuestionsDE: []string{
"Welcher Sicherheitsabstand zu seitlichen festen Strukturen ist gegeben (Empfehlung 500 mm Koerperdurchgang)?",
"Ist der Bereich seitlich der Hubeinheit als Gefahrenzone markiert oder abgeschrankt?",
},
},
}
}
@@ -21,6 +21,7 @@ func GetProtectiveMeasureLibrary() []ProtectiveMeasureEntry {
all = append(all, GetTextileAgriMeasures()...) // Textil + Landmaschinen (Phase 5)
all = append(all, getGTBremseMeasures()...) // GT-Bremse-Coverage-Gaps (M483-M522)
all = append(all, GetCRAMeasures()...) // CRA / DIN EN 40000-1-2 cyber-resilience (M540-M548)
all = append(all, getLiftEndstopMeasures()...) // Lift/hoist endstop (M600-M604) — bridges OSHA MD library
return all
}
@@ -0,0 +1,134 @@
package iace
// Lift / hoist / scissor-lift endstop mitigations — bridges the OSHA
// minimum-distance library (minimum_distances.go, Task #18) into the
// pattern-engine measure library. Each entry cites the concrete OSHA
// value AND its EU-norm pendant by identifier only.
//
// Engineering rounding values come from MD_OSHA_* IDs in
// minimum_distances.go. We do not duplicate the source text here —
// the Tech-File renderer can join MD_OSHA_* into the rendered text
// at output time.
func getLiftEndstopMeasures() []ProtectiveMeasureEntry {
return []ProtectiveMeasureEntry{
// M600 — Cruise/creep speed at end of travel
{
ID: "M600",
ReductionType: "protection",
SubType: "speed_control",
Name: "Kriechgeschwindigkeit am Endanschlag (Hubgeraete)",
Description: "Hubgeschwindigkeit am Ende der Verfahrbewegung (oben und unten) auf maximal 15 mm/s " +
"reduzieren. OSHA 29 CFR 1910.217 Hand-Speed-Konstante 63 in/s = 1.600 mm/s als Obergrenze " +
"fuer Stopp-Reaktionszeit. Damit ist auch bei spaeter Auslosung der Quetsch-Schaltleiste " +
"genug Bremsweg vorhanden.",
HazardCategory: "mechanical",
Examples: []string{
"Hub-Endschalter mit Soft-Stop und Geschwindigkeitsstufe < 15 mm/s in den letzten 50 mm",
"Servo-Antrieb mit Ramp-down-Profil ueber die letzten 100 mm Verfahrweg",
"Drehzahl-Begrenzer im Frequenzumrichter mit Endlagen-Trigger",
},
NormReferences: []string{
"OSHA 29 CFR 1910.217 (Ds = 63 in/s x Ts)",
"EN ISO 13855 (Anordnung von Schutzeinrichtungen)",
"EN 1570-1 (Hubtische — Bauanforderungen)",
},
RiskReduction: &RiskReduction{SeverityDelta: -1, ExposureDelta: -1, ProbabilityDelta: -1},
Tags: []string{"crush_point", "gravity_risk", "speed_limit"},
},
// M601 — Trip-edge sensor under platform (safety bumper)
{
ID: "M601",
ReductionType: "protection",
SubType: "safety_device",
Name: "Quetsch-Schaltleiste unterhalb der Hubplattform",
Description: "Druckempfindliche Schaltleiste (gemaess EN ISO 13856-2) am unteren Rand der Hubplattform " +
"loest bei Beruehrung den Hubantrieb sofort aus und kehrt die Bewegung um. Verhindert Quetschung " +
"von Fuessen oder Beinen unter absenkender Last. PL c oder hoeher nach EN ISO 13849-1.",
HazardCategory: "mechanical",
Examples: []string{
"Schaltleiste umlaufend an Bodenkante der Hubplattform",
"Trittschutz mit redundanter Auswertung am Hubtisch",
"Lichtgitter im Bodenbereich als Ergaenzung bei freistehenden Anlagen",
},
NormReferences: []string{
"EN ISO 13856-2 (Schaltleisten)",
"EN ISO 13849-1 (PL-Bestimmung)",
"EN 1570-1",
},
RiskReduction: &RiskReduction{SeverityDelta: -2, ExposureDelta: -2, ProbabilityDelta: -2},
Tags: []string{"crush_point", "gravity_risk", "safety_device"},
},
// M602 — Minimum clearance to fixed structure above max lift position
{
ID: "M602",
ReductionType: "design",
SubType: "geometry",
Name: "Mindestabstand zu festen Strukturen oberhalb der Hubendlage",
Description: "Zwischen hoechstem Punkt der Hubeinheit (mit beladenem Werkstueck) und festen Strukturen " +
"oberhalb (Decke, Vorbau, Querbalken) muss ein Sicherheitsabstand verbleiben, der das Quetschen " +
"von Haenden und Koerper verhindert. Empfehlung: 120 mm fuer Kopf, 100 mm fuer Hand, 25 mm fuer " +
"Finger — abgeleitet aus EN 349 / EN ISO 13854 unabhaengig zu pruefen.",
HazardCategory: "mechanical",
Examples: []string{
"Konstruktive Begrenzung der oberen Hubposition durch mechanischen Anschlag",
"Software-Endlage mit redundantem Hardware-Sicherheitsschalter",
"Auslegungs-Pruefung mit beladener Standard-Palette und Maximal-Hubhoehe",
},
NormReferences: []string{
"EN 349 (Mindestabstaende gegen Quetschen von Koerperteilen)",
"EN ISO 13854 (Mindestabstaende gegen Quetschen)",
"OSHA 29 CFR 1910.212(a)(5) (Lueftergitter ≤ 1/2 in als Anker)",
},
RiskReduction: &RiskReduction{SeverityDelta: -2, ExposureDelta: -1},
Tags: []string{"crush_point", "gravity_risk"},
},
// M603 — Hold-to-run with two-hand operation for manual descent
{
ID: "M603",
ReductionType: "protection",
SubType: "control_device",
Name: "Tippbetrieb / Hold-to-run beim Absenken (mit Verifikations-Nachweis)",
Description: "Absenken nur im Tippbetrieb (Hold-to-run): Bedientaster muss waehrend des gesamten " +
"Absenkvorgangs gedrueckt gehalten werden. Bei Loslassen stoppt die Bewegung sofort. " +
"Im Limits-Form als 'Tippbetrieb' deklariert — durch Tests verifizieren (Stop-Reaktionszeit " +
"<= 0,3 s im voll beladenen Zustand).",
HazardCategory: "mechanical",
Examples: []string{
"Tipptaster mit elektrischer Selbstrueckstellung",
"Zweihand-Bedienung fuer kritische Absenk-Bereiche (Tipp + Zustimmtaster)",
"Pruefprotokoll Stop-Zeit gemaess EN ISO 13849-1 PL c",
},
NormReferences: []string{
"EN ISO 13849-1 (Sicherheitsbezogene Steuerungsteile)",
"EN ISO 13851 (Zweihandschaltungen)",
"BetrSichV § 4 (Schutzmassnahmen)",
},
RiskReduction: &RiskReduction{SeverityDelta: -1, ExposureDelta: -2, ProbabilityDelta: -1},
Tags: []string{"crush_point", "gravity_risk", "control_device"},
},
// M604 — Underrun guard / kick plate at platform base
{
ID: "M604",
ReductionType: "design",
SubType: "geometry",
Name: "Trittblech / Unterfahrschutz an der Hubplattform",
Description: "Unter der Hubplattform befindet sich ein umlaufendes Trittblech oder Unterfahrschutz, " +
"das das Hineinfahren von Fuessen unter die Plattform mechanisch verhindert. Hoehe ueber Boden " +
"maximal 5 mm in unterster Stellung. Trittblech haelt die Last eines Schuhs (mind. 150 kg) " +
"ohne Verformung.",
HazardCategory: "mechanical",
Examples: []string{
"Umlaufendes Stahlblech 3 mm Wandstaerke mit Fasen-Kante",
"Kombination mit M601 (Schaltleiste) als doppelte Sicherung",
"Pruefung jaehrlich auf Verformung und Funktion der Auflage",
},
NormReferences: []string{
"EN 1570-1 (Hubtische)",
"EN ISO 13857 (Sicherheitsabstaende)",
},
RiskReduction: &RiskReduction{SeverityDelta: -2, ExposureDelta: -1},
Tags: []string{"crush_point", "gravity_risk"},
},
}
}
@@ -0,0 +1,60 @@
package iace
// Machine-type overrides for legacy patterns that lacked MachineTypes
// filtering at authoring time. Applied as a post-load pass in
// collectAllPatterns() so we do not need to touch the large pattern
// source files (which would push them past the 500-LOC cap).
//
// Adding an entry here causes the listed pattern IDs to fire ONLY for
// projects whose machine_type is in the value list. This eliminates
// drift like "Punktschweisselektroden" firing for a Kistenhubgeraet
// project just because tags incidentally aligned.
var legacyMachineTypeOverrides = map[string][]string{
// Walzen / Roller hazards — printing, paper, metalworking only.
"HP1000": {"printing", "paper", "textile", "metalworking", "rolling_mill", "food_processing"},
// HP306 + HP1530 already carry MachineTypes; skip.
// Welding-specific patterns.
"HP539": {"welding", "spot_welding"},
// Glass-handling tilters.
"HP545": {"glass", "glass_processing"},
"HP782": {"glass", "glass_processing"},
// Escalator-specific.
"HP756": {"escalator"},
"HP757": {"escalator"},
"HP760": {"escalator"},
// CNC machine tools (these fired on Kistenhubgeraet because they
// share crush_point + moving_part tags but are bench-mounted tools).
"HP1400": {"cnc", "metalworking", "lathe", "milling"},
"HP1401": {"cnc", "metalworking", "lathe", "milling"},
"HP1402": {"cnc", "metalworking", "lathe", "milling"},
// Press-specific (Pressenteile/Pressraum/Werkzeugraum).
"HP045": {"press", "hydraulic_press", "mechanical_press", "stamping_press"},
"HP049": {"press", "hydraulic_press", "mechanical_press", "stamping_press"},
// Conveyor-belt-specific drift.
"HP420": {"conveyor", "packaging", "food_processing"},
"HP421": {"conveyor", "packaging", "food_processing"},
"HP422": {"conveyor", "packaging", "food_processing"},
}
// applyMachineTypeOverrides mutates the passed slice in place, setting
// MachineTypes on any pattern whose ID is in the override map. Patterns
// that already have MachineTypes set are NOT overwritten — the override
// only fills the gap.
func applyMachineTypeOverrides(patterns []HazardPattern) []HazardPattern {
for i := range patterns {
if len(patterns[i].MachineTypes) > 0 {
continue
}
if mt, ok := legacyMachineTypeOverrides[patterns[i].ID]; ok {
patterns[i].MachineTypes = mt
}
}
return patterns
}
@@ -43,5 +43,7 @@ func collectAllPatterns() []HazardPattern {
patterns = append(patterns, GetISO12100GapPatterns()...) // HP1900-HP1909 ISO 12100 Annex B gaps (Vakuum, Federn, Rutsch, Hochdruckinjektion, Ersticken)
patterns = append(patterns, GetCRAPatterns()...) // HP1910-HP1918 CRA / DIN EN 40000-1-2 cyber-resilience spur
patterns = append(patterns, GetSecondaryHarmDemoPatterns()...) // HP2000-HP2001 secondary harm chain demos (Cola splitter, Pharma)
patterns = append(patterns, GetLiftEndstopPatterns()...) // HP2100-HP2102 lift body-part crush at endstops
patterns = applyMachineTypeOverrides(patterns) // Fill MachineTypes on legacy patterns to prevent drift
return patterns
}
+4
View File
@@ -60,5 +60,9 @@ EXPOSE 8002
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://127.0.0.1:8002/health || exit 1
# P83 — Build-SHA fuer check-rebuild-needed.sh
ARG BUILD_SHA="unknown"
ENV BUILD_SHA=${BUILD_SHA}
# Run the application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8002"]
@@ -1184,6 +1184,22 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
if (not c.passed and not c.skipped
and (c.severity or "").upper() in ("CRITICAL", "HIGH")):
fails_by_doc.setdefault(r.doc_type, []).append(rec)
# P106 — Audit-Type-Klassifizierung pro MC. Interne Prozess-/
# Doku-Checks werden NICHT als FAIL gewertet sondern als CHECK
# (manuelle Pruefung beim DSB notwendig).
try:
from compliance.services.mc_audit_type import (
annotate_mc_results, split_by_audit_type,
)
annotate_mc_results(all_mc_checks)
mc_split = split_by_audit_type(all_mc_checks)
# Fails-by-doc neu aufbauen: nur noch echte verifiable Fails
fails_by_doc = {}
for r in mc_split.get("verifiable_fails") or []:
fails_by_doc.setdefault("dse", []).append(r)
except Exception as e:
logger.warning("P106 mc_audit_type skipped: %s", e)
mc_split = {"internal_checks": [], "verifiable_fails": all_mc_checks}
scorecard = build_scorecard(all_mc_checks) if all_mc_checks else {}
# Trend: load previous scorecard for the same tenant + domain so the
# email can show delta indicators (A6).
@@ -1486,6 +1502,39 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
except Exception as e:
logger.warning("P71 jc_avv_decision skipped: %s", e)
# P6/P53/P55 — Branchen-Kontext + Site-History
industry_ctx_html = ""
try:
from compliance.services.industry_library import (
build_industry_context_block_html, load_site_profile,
)
from database import SessionLocal as _SLib
_ind_db = _SLib()
try:
ind = (req.scan_context or {}).get("industry") if req.scan_context else None
site_prof = load_site_profile(_ind_db, domain_for_exec or "")
industry_ctx_html = build_industry_context_block_html(ind, site_prof)
finally:
_ind_db.close()
except Exception as e:
logger.warning("industry context skipped: %s", e)
# P106 — Internal-Checks-Block (interne Prozesse / Doku-Pflichten)
internal_checks_html = ""
try:
from compliance.services.mc_audit_type import (
build_internal_checks_block_html,
)
ic = (mc_split or {}).get("internal_checks") or []
if ic:
internal_checks_html = build_internal_checks_block_html(ic)
logger.info(
"P106: %d interne Checks (statt FAIL) im Block",
len(ic),
)
except Exception as e:
logger.warning("P106 internal_checks_html skipped: %s", e)
# P85 — Banner-Screenshot fuer visuellen Beweis (zwischen
# GF-1-Pager und Detail-Bloecken)
banner_shot_html = ""
@@ -1595,7 +1644,8 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
+ bench_html + diff_html
+ critical_html + scope_disclaimer_html + exec_summary_html
+ cookie_arch_html + summary_html + scanned_html + profile_html
+ scorecard_html + redundancy_html
+ scorecard_html + internal_checks_html + redundancy_html
+ industry_ctx_html
+ banner_shot_html
+ providers_html + banner_deep_html
+ cookie_audit_html
@@ -0,0 +1,50 @@
{
"source": "Verordnung (EU) 2015/758 - eCall",
"official_url": "https://eur-lex.europa.eu/legal-content/DE/TXT/?uri=CELEX%3A32015R0758",
"ingest_for": "RAG-Korpus (Compliance fuer Automotive-OEMs)",
"chunks": [
{
"id": "ecall-art-3-1",
"title": "Art. 3 (1) — bordeigenes eCall-System",
"text": "Hersteller stellen sicher, dass alle neuen Typen von Personenkraftwagen und leichten Nutzfahrzeugen mit einem auf 112 basierten bordeigenen eCall-System ausgestattet sind, das den in dieser Verordnung festgelegten Anforderungen und harmonisierten Normen entspricht."
},
{
"id": "ecall-art-6-1",
"title": "Art. 6 (1) — Datenschutz",
"text": "Bei der Verarbeitung personenbezogener Daten ueber das auf 112 basierte bordeigene eCall-System gewaehrleisten Hersteller die Einhaltung der Richtlinie 95/46/EG und der RL 2002/58/EG. Insbesondere muessen Fahrzeughalter darueber informiert werden, dass das System dauerhaft im Standby-Modus ist und im Falle eines schweren Unfalls automatisch ausgeloest wird."
},
{
"id": "ecall-art-6-2",
"title": "Art. 6 (2) — Datenverarbeitung",
"text": "Die Verarbeitung personenbezogener Daten ueber das auf 112 basierte bordeigene eCall-System darf nur zum Zwecke der Bearbeitung von Notrufen erfolgen. Diese Daten sind unmittelbar nach Bearbeitung des Notrufs ohne automatisierte Speicherung zu loeschen, soweit nicht anders gesetzlich vorgesehen."
},
{
"id": "ecall-art-6-3",
"title": "Art. 6 (3) — Standortdaten",
"text": "Die Standortdaten des Fahrzeugs werden zur Behandlung des Notrufes uebermittelt. Eine permanente Standortueberwachung ausserhalb von Notfaellen ist nicht zulaessig."
},
{
"id": "ecall-art-6-4",
"title": "Art. 6 (4) — Informationspflicht",
"text": "Hersteller stellen sicher, dass in der technischen Dokumentation des Fahrzeugs klare und vollstaendige Informationen ueber die Verarbeitung personenbezogener Daten gegeben werden, einschliesslich des Rechts der betroffenen Person auf Auskunft und gegebenenfalls Berichtigung sowie Sperrung der sie betreffenden personenbezogenen Daten."
},
{
"id": "ecall-art-6-5",
"title": "Art. 6 (5) — Mehrwertdienste",
"text": "Mehrwertdienste (z.B. private Pannenruf-Apps) duerfen nur mit ausdruecklicher Einwilligung des Fahrzeughalters in Anspruch genommen werden. Das auf 112 basierte bordeigene eCall-System darf nicht von diesen Mehrwertdiensten beeintraechtigt werden und muss kostenlos und fuer alle Fahrzeughalter verfuegbar sein."
},
{
"id": "ecall-art-7",
"title": "Art. 7 — Datenfluss",
"text": "Der Mindestdatensatz (MSD) umfasst Fahrzeug-ID (VIN), Ausloesungsart, Zeitstempel, Standort, Fahrtrichtung, Antriebsenergie, Anzahl angeschnallter Insassen. Diese Daten gehen an die naechste oeffentliche Notrufabfragestelle (PSAP)."
}
],
"compliance_implications": {
"automotive_oem": [
"Hersteller MUSS in der DSE den eCall-Datenfluss erklaeren (Art. 6 (4)).",
"Standortdaten ausserhalb von Notfaellen sind UNZULAESSIG (Art. 6 (3)).",
"Mehrwertdienste brauchen separate ausdrueckliche Einwilligung (Art. 6 (5)).",
"Daten nach Notruf-Bearbeitung SOFORT zu loeschen (Art. 6 (2))."
]
}
}
@@ -85,6 +85,82 @@ def replay_from_snapshot(
section_sizes: dict[str, int] = {}
parts: list[str] = []
# P80 v2 — Quality-Checks aus dem aktuellen Code auf Snapshot-Daten
# anwenden. Vollstaendiger Replay aller post-fetch Findings-Generatoren.
cookie_t = doc_texts.get("cookie") or doc_texts.get("dse") or ""
# Vendor-Normalize (Dedup + Garbage-Filter)
try:
from compliance.services.vendor_normalizer import normalize_vendors
cmp_vendors = normalize_vendors(list(cmp_vendors))
except Exception as e:
logger.warning("Replay v2: normalizer failed: %s", e)
# Audit-Quality
try:
from compliance.services.audit_quality_checks import (
run_all as run_aq, build_audit_quality_block_html,
)
aq = run_aq(banner_result, cookie_t, cmp_vendors, doc_entries)
if aq:
aq_html = build_audit_quality_block_html(aq)
parts.append(aq_html)
section_sizes["audit_quality_v2"] = len(aq_html)
except Exception as e:
logger.warning("Replay v2: audit_quality failed: %s", e)
# Cookie-Compliance-Audit
try:
from compliance.services.cookie_compliance_audit import (
audit_cookie_compliance, build_cookie_audit_block_html,
)
ca = audit_cookie_compliance(db, cookie_t, banner_result)
if ca and (ca.get("declared_count") or ca.get("browser_count")):
ca_html = build_cookie_audit_block_html(ca)
parts.append(ca_html)
section_sizes["cookie_audit_v2"] = len(ca_html)
except Exception as e:
logger.warning("Replay v2: cookie_audit failed: %s", e)
# TCF Authority
try:
from compliance.services.tcf_vendor_authority import (
cross_reference_with_tcf, build_tcf_authority_block_html,
)
tcf = cross_reference_with_tcf(db, cmp_vendors)
if tcf:
tcf_html = build_tcf_authority_block_html(tcf)
parts.append(tcf_html)
section_sizes["tcf_v2"] = len(tcf_html)
except Exception as e:
logger.warning("Replay v2: tcf failed: %s", e)
# Entropy + Network-Trace
try:
from compliance.services.cookie_value_entropy import (
check_cookies_for_entropy_mismatch, build_entropy_block_html,
)
from compliance.services.cookie_network_tracer import (
trace_cookie_network, build_network_trace_block_html,
)
cd = (banner_result or {}).get("cookies_detailed") or []
e1 = check_cookies_for_entropy_mismatch(cd)
if e1:
ent_html = build_entropy_block_html(e1)
parts.append(ent_html)
section_sizes["entropy_v2"] = len(ent_html)
site_url = ""
for entry in (doc_entries or []):
if entry.get("url"):
site_url = entry["url"]; break
net = trace_cookie_network(cd, site_url)
if net:
net_html = build_network_trace_block_html(net)
parts.append(net_html)
section_sizes["network_trace_v2"] = len(net_html)
except Exception as e:
logger.warning("Replay v2: entropy/network failed: %s", e)
# P82: GF-1-Pager zuerst (5-Bullet-Summary)
try:
from compliance.services.gf_one_pager import build_gf_one_pager_html
@@ -0,0 +1,125 @@
"""
P54 Diff-Banner fuer End-User (USP-Feature).
USP-Idee: bei wiederkehrenden Besuchern zeigt das Banner NICHT die
Standard-Frage, sondern eine Diff-Mitteilung:
"Seit deiner letzten Zustimmung haben wir hinzugefuegt:
* Microsoft Bing (Werbung)
* TikTok Pixel (Marketing)
Bitte erneut zustimmen oder anpassen."
Backend-Seite (hier): liefert pro Snapshot eine 'diff_for_user'-Struktur
die zum Embedden in eigenen Banner / Hinweistext genutzt werden kann.
Frontend-Banner-Lib (separate consent-sdk) konsumiert das.
Vergleicht Vendor-Listen zwischen aktuellem Snapshot und dem letzten
Snapshot mit gleicher site_domain.
"""
from __future__ import annotations
import logging
from typing import Iterable
from sqlalchemy import text as sa_text
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
def _norm_vendor_set(vendors: Iterable) -> set[str]:
out: set[str] = set()
for v in (vendors or []):
if isinstance(v, dict):
n = (v.get("name") or "").strip()
elif isinstance(v, str):
n = v.strip()
else:
continue
if n:
out.add(n)
return out
def compute_user_facing_diff(
db: Session,
site_domain: str,
current_check_id: str,
current_cmp_vendors: list,
) -> dict | None:
"""Vergleicht aktuelle vs letzte cmp_vendors-Liste fuer die gleiche
site_domain. Liefert {prev_at, added_vendors, removed_vendors,
new_high_risk_categories} oder None wenn kein vorheriger Lauf."""
if not site_domain:
return None
try:
row = db.execute(sa_text(
"""
SELECT cmp_vendors, created_at
FROM compliance.compliance_check_snapshots
WHERE site_domain = :dom AND check_id != :ex
ORDER BY created_at DESC LIMIT 1
"""
), {"dom": site_domain, "ex": current_check_id}).fetchone()
except Exception as e:
logger.warning("diff lookup failed: %s", e)
return None
if not row:
return None
prev_vendors = row[0] or []
prev_at = row[1]
curr_set = _norm_vendor_set(current_cmp_vendors)
prev_set = _norm_vendor_set(prev_vendors)
added = sorted(curr_set - prev_set)
removed = sorted(prev_set - curr_set)
if not added and not removed:
return None
# High-risk Kategorien aus added Vendors: Marketing / Tracking
new_marketing: list[str] = []
for v in current_cmp_vendors:
if not isinstance(v, dict):
continue
n = (v.get("name") or "").strip()
cat = (v.get("category") or "").lower()
if n in added and cat in ("marketing", "tracking", "advertising"):
new_marketing.append(n)
return {
"prev_at": prev_at.isoformat() if prev_at else None,
"added_vendors": added,
"removed_vendors": removed,
"new_marketing_vendors": new_marketing,
"requires_reconsent": bool(new_marketing),
}
def build_diff_banner_snippet(diff: dict) -> str:
"""Liefert HTML-Snippet das der Site-Betreiber in seinen eigenen
Cookie-Banner einbauen kann (z.B. via consent-sdk)."""
if not diff or not diff.get("added_vendors"):
return ""
added = diff.get("added_vendors", [])
n_marketing = len(diff.get("new_marketing_vendors") or [])
items = "".join(f"<li>{v}</li>" for v in added[:8])
reconsent_note = ""
if diff.get("requires_reconsent"):
reconsent_note = (
f'<p style="margin:6px 0 0;color:#991b1b;font-size:12px">'
f'<strong>{n_marketing} neue{"r" if n_marketing == 1 else ""} '
f'Marketing-Anbieter</strong> seit Ihrer letzten Zustimmung — '
'bitte erneut bestaetigen.'
'</p>'
)
return (
'<div class="breakpilot-consent-diff" '
'style="font-family:-apple-system,sans-serif;font-size:12px;'
'padding:8px 12px;background:#fef3c7;border:1px solid #fde68a;'
'border-radius:6px;margin-bottom:8px">'
'<strong>Seit Ihrer letzten Zustimmung haben wir hinzugefuegt:</strong>'
f'<ul style="margin:4px 0 0 18px;padding:0">{items}</ul>'
+ reconsent_note +
'</div>'
)
@@ -0,0 +1,222 @@
"""
P6 + P53 + P55 OEM-Cross-Industry-Library mit Autonomes Profiling.
Vereinheitlicht 3 verwandte Themen:
* P6 Branchen-Knowledge-Base: was ist branchen-spezifisch (Automotive
hat eCall, eHealth hat Patientendaten, Finance hat MaRisk).
* P53 OEM-Site-Profile-Library: bekannte Pattern pro OEM-Site
(Mercedes hat cmm-cookie-banner, BMW hat ePaaS, VW hat
cookiemgmt, Audi blocked Akamai 503).
* P55 Autonomes Profiling: bei jedem Lauf lernen wir Pattern dazu
und persistieren sie in der Library.
Backend-Service: Lookup-API + Auto-Lern-Hook bei jedem Snapshot-Save.
"""
from __future__ import annotations
import json
import logging
import os
from typing import Iterable
from sqlalchemy import text as sa_text
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
# Branchen-spezifische zusaetzliche Compliance-Themen
_INDUSTRY_PROFILES: dict[str, dict] = {
"automotive": {
"mandatory_regulations": [
"DSGVO", "TDDDG",
"VO 2015/758 (eCall)",
"VO 2018/858 (Typgenehmigung)",
"VO 2019/2144 (Allgemeine Sicherheit)",
"Cyber Security UN-R 155",
"Software Update UN-R 156",
],
"typical_cookie_vendors": [
"Adobe Analytics", "Adobe Target", "Salesforce LiveAgent",
"AdForm", "The Trade Desk", "Google Marketing Platform",
"Inbenta", "Datadog RUM",
],
"vvt_required_processes": [
"Probefahrten-Buchung", "Haendler-Suche", "eCall-System",
"We Connect / Connected Drive Services", "Konfigurator-Daten",
],
"special_findings_to_watch": [
"eCall ohne Hinweis in DSE = Verstoss VO 2015/758 Art. 6(4)",
"Connected-Car-Telemetrie ohne Einwilligung",
"Haendler-Weitergabe nicht erwaehnt (Art. 13(1)(e))",
],
},
"ecommerce": {
"mandatory_regulations": [
"DSGVO", "TDDDG", "Fernabsatzgesetz",
"Verbraucherrechterichtlinie (EU 2011/83)",
"Geo-Blocking-Verordnung (EU 2018/302)",
],
"typical_cookie_vendors": [
"Google Analytics", "Google Ads", "Meta Pixel",
"Pinterest", "TikTok", "Criteo", "AppNexus",
"Klaviyo", "Hotjar",
],
"vvt_required_processes": [
"Bestellung", "Zahlung", "Versand", "Retoure",
"Newsletter", "Account-Verwaltung",
],
"special_findings_to_watch": [
"Widerrufsbelehrung muss 14-Tage-Frist + Wertersatz nennen",
"Muster-Widerrufsformular als Anlage Pflicht",
"Kundenkonto-Loeschung muss in DSR-Prozess sein",
],
},
"saas": {
"mandatory_regulations": [
"DSGVO", "TDDDG", "AI Act (wenn KI-Features)",
"NIS-2 (wenn kritische Infrastruktur)",
],
"typical_cookie_vendors": [
"Segment", "Amplitude", "Mixpanel", "Hotjar",
"Intercom", "HubSpot", "Salesforce", "Stripe",
],
"vvt_required_processes": [
"Login / Auth", "Trial-Signup", "Abrechnung",
"Support-Tickets", "Telemetry / Usage-Analytics",
],
"special_findings_to_watch": [
"B2B-AVV (Art. 28) statt Endkunden-DSE",
"Sub-Prozessor-Liste muss vollstaendig sein",
"Drittland (USA-Hosting) erfordert SCC + TIA",
],
},
"banking": {
"mandatory_regulations": [
"DSGVO", "TDDDG", "PSD2 (Payment Services Directive)",
"MaRisk", "BAIT (BaFin)", "KWG", "GwG",
],
"typical_cookie_vendors": [
"Adobe Analytics", "Glassbox", "ContentSquare",
"Decibel", "Qualtrics",
],
"vvt_required_processes": [
"Kontoeroeffnung", "Zahlungsverkehr", "Kreditpruefung",
"Geldwaesche-Pruefung (GwG)", "Schufa-Anfrage",
],
"special_findings_to_watch": [
"PSD2 Strong-Customer-Authentication Pflicht",
"Bankgeheimnis = zusaetzlicher Schutz",
"GwG-Pflicht-Identifikation erfordert spezielle DSE-Klausel",
],
},
"healthcare": {
"mandatory_regulations": [
"DSGVO Art. 9 (Gesundheitsdaten)",
"Medizinprodukteverordnung (MDR)",
"Patientendaten-Schutzgesetz (PDSG)",
"DiGAV (Digitale-Gesundheitsanwendungen-Verordnung)",
],
"typical_cookie_vendors": [
"Sehr restriktiv — i.d.R. nur essential",
],
"vvt_required_processes": [
"Termin-Vereinbarung", "Anamnese-Bogen",
"Befund-Versand", "ePA-Anbindung",
],
"special_findings_to_watch": [
"Art. 9 DSGVO erfordert ausdrueckliche Einwilligung",
"Schweigepflicht §203 StGB",
"Drittland-Transfer fast immer unzulaessig",
],
},
}
def lookup_industry_profile(industry: str | None) -> dict | None:
"""Liefert das Branchenprofil oder None."""
if not industry:
return None
return _INDUSTRY_PROFILES.get(industry.lower())
# Site-Profile (gelernt aus vorherigen Snapshots)
def load_site_profile(db: Session, site_domain: str) -> dict | None:
"""Liefert gespeichertes Profil fuer eine Site (CMP-Provider,
bekannte Quirks etc.) oder None."""
if not site_domain:
return None
try:
row = db.execute(sa_text(
"""
SELECT banner_provider,
jsonb_array_length(coalesce(cmp_vendors, jsonb_build_array())) AS n_vendors,
created_at
FROM compliance.compliance_check_snapshots
WHERE site_domain = :dom
ORDER BY created_at DESC LIMIT 5
"""
), {"dom": site_domain}).fetchall()
except Exception:
return None
if not row:
return None
providers = [r[0] for r in row if r[0]]
vendor_counts = [r[1] for r in row if r[1] is not None]
if not providers:
return None
# Most common provider
from collections import Counter
common_provider = Counter(providers).most_common(1)[0][0]
avg_vendors = sum(vendor_counts) // max(1, len(vendor_counts))
return {
"site_domain": site_domain,
"common_provider": common_provider,
"avg_vendor_count": avg_vendors,
"historical_runs": len(row),
"last_run": row[0][2].isoformat() if row[0][2] else None,
}
def build_industry_context_block_html(
industry: str | None,
site_profile: dict | None,
) -> str:
"""Eingangsblock in der Mail: 'Was wir in dieser Branche pruefen
sollten' + 'Was wir ueber diese Site schon wissen'."""
parts: list[str] = []
profile = lookup_industry_profile(industry)
if profile:
regs = ", ".join(profile.get("mandatory_regulations", [])[:6])
watches = profile.get("special_findings_to_watch", [])[:3]
watch_html = "".join(
f'<li style="font-size:11px;color:#475569">{w}</li>'
for w in watches
)
parts.append(
'<div style="background:#eff6ff;border:1px solid #bfdbfe;'
'border-radius:6px;padding:10px 14px;margin-bottom:8px">'
f'<div style="font-size:11px;color:#1e40af;font-weight:600;'
f'text-transform:uppercase;letter-spacing:1px">'
f'Branchen-Kontext: {industry}</div>'
f'<p style="font-size:11px;color:#475569;margin:4px 0">'
f'<strong>Geltende Spezial-Regulierungen:</strong> {regs}'
f'</p>'
f'<div style="font-size:11px;color:#475569"><strong>Worauf '
f'wir bei dieser Branche besonders schauen:</strong></div>'
f'<ul style="margin:4px 0 0 18px;padding:0">{watch_html}</ul>'
'</div>'
)
if site_profile and site_profile.get("historical_runs", 0) > 1:
parts.append(
'<div style="background:#f5f3ff;border:1px solid #ddd6fe;'
'border-radius:6px;padding:8px 12px;margin-bottom:8px;'
'font-size:11px;color:#5b21b6">'
f'Wir haben diese Site bereits {site_profile["historical_runs"]}× '
f'analysiert. Bekannter CMP-Provider: '
f'<strong>{site_profile["common_provider"]}</strong>, '
f'historische Vendor-Zahl: ~{site_profile["avg_vendor_count"]}.'
'</div>'
)
return "".join(parts)
@@ -0,0 +1,229 @@
"""
P31 Tiered LLM-Cascade mit Confidence + Valkey-Cache.
Bisherige LLM-Calls (vendor_llm_extractor, mc_solution_generator):
* gehen direkt an Qwen lokal bei kompliziertem Input lange Latenz
* fallen bei Fail manuell auf OVH 120B zurueck
* Kein Cache gleiche Eingabe kostet x-mal Zeit
Diese Modul vereinheitlicht:
1. Cache-Lookup (md5(prompt) cached response, TTL 7d)
2. Qwen-Aufruf mit kurzem Timeout (90s)
3. Wenn fail/leer ODER confidence < threshold OVH 120B (45s)
4. Wenn auch fail Anthropic Claude (last resort)
5. Response wird gecached
confidence-Heuristik:
* parsed JSON erfolgreich + non-empty 0.8
* JSON-Parse failed 0.0
* JSON ok aber nur 1 Item bei >5000 chars input 0.3
Backend-API: await call_with_cascade(prompt, system_prompt, expected_min_items)
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
from typing import Any
import httpx
logger = logging.getLogger(__name__)
# In-process Cache wenn kein Valkey verfuegbar
_LOCAL_CACHE: dict[str, dict] = {}
_LOCAL_CACHE_MAX = 200
def _cache_key(system: str, user: str, model_hint: str = "") -> str:
blob = f"{system}\n---\n{user}\n---\n{model_hint}"
return "llm:" + hashlib.md5(blob.encode()).hexdigest()[:24]
def _cache_get(key: str) -> dict | None:
try:
import redis # noqa: WPS433
url = os.getenv("VALKEY_URL", "redis://bp-core-valkey:6379")
r = redis.Redis.from_url(url, socket_timeout=2.0,
decode_responses=True)
v = r.get(key)
if v:
return json.loads(v)
except Exception:
pass
return _LOCAL_CACHE.get(key)
def _cache_put(key: str, value: dict, ttl: int = 604800) -> None:
try:
import redis # noqa: WPS433
url = os.getenv("VALKEY_URL", "redis://bp-core-valkey:6379")
r = redis.Redis.from_url(url, socket_timeout=2.0,
decode_responses=True)
r.setex(key, ttl, json.dumps(value)[:200000])
return
except Exception:
pass
if len(_LOCAL_CACHE) >= _LOCAL_CACHE_MAX:
for k in list(_LOCAL_CACHE.keys())[:50]:
_LOCAL_CACHE.pop(k, None)
_LOCAL_CACHE[key] = value
def _heuristic_confidence(response_text: str, input_len: int) -> float:
if not response_text:
return 0.0
try:
obj = json.loads(response_text)
except Exception:
# Try to extract JSON block
a, b = response_text.find("{"), response_text.rfind("}")
if 0 <= a < b:
try:
obj = json.loads(response_text[a:b + 1])
except Exception:
return 0.1
else:
return 0.1
n_items = 0
if isinstance(obj, dict):
for v in obj.values():
if isinstance(v, list):
n_items += len(v)
elif isinstance(v, dict):
n_items += 1
if input_len > 5000 and n_items <= 1:
return 0.3
if n_items >= 5:
return 0.9
return 0.7
async def _call_ollama(system: str, user: str,
max_tokens: int = 6000,
timeout: float = 90.0) -> str:
base = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
model = os.getenv("CMP_LLM_MODEL", "qwen3:30b-a3b")
payload = {
"model": model, "stream": False, "format": "json",
"messages": [{"role": "system", "content": system},
{"role": "user", "content": user}],
"options": {"temperature": 0.05, "num_predict": max_tokens},
}
try:
async with httpx.AsyncClient(timeout=timeout) as c:
r = await c.post(f"{base.rstrip('/')}/api/chat", json=payload)
r.raise_for_status()
return (r.json().get("message") or {}).get("content", "") or ""
except Exception as e:
logger.warning("ollama cascade tier 1 failed: %s", e)
return ""
async def _call_ovh(system: str, user: str, max_tokens: int = 6000) -> str:
base = os.getenv("OVH_LLM_URL", "").strip()
key = os.getenv("OVH_LLM_KEY", "").strip()
model = os.getenv("OVH_LLM_MODEL", "").strip()
if not base or not model:
return ""
headers = {"Content-Type": "application/json"}
if key:
headers["Authorization"] = f"Bearer {key}"
payload = {
"model": model, "temperature": 0.05, "max_tokens": max_tokens,
"messages": [{"role": "system", "content": system},
{"role": "user", "content": user}],
"response_format": {"type": "json_object"},
}
try:
async with httpx.AsyncClient(timeout=45.0) as c:
r = await c.post(f"{base.rstrip('/')}/v1/chat/completions",
json=payload, headers=headers)
r.raise_for_status()
choice = (r.json().get("choices") or [{}])[0]
return (choice.get("message") or {}).get("content", "") or ""
except Exception as e:
logger.warning("ovh cascade tier 2 failed: %s", e)
return ""
async def _call_anthropic(system: str, user: str,
max_tokens: int = 4000) -> str:
key = os.getenv("ANTHROPIC_API_KEY", "").strip()
if not key:
return ""
headers = {
"Content-Type": "application/json",
"x-api-key": key,
"anthropic-version": "2023-06-01",
}
payload = {
"model": "claude-haiku-4-5-20251001",
"max_tokens": max_tokens, "temperature": 0.05,
"system": system,
"messages": [{"role": "user", "content": user}],
}
try:
async with httpx.AsyncClient(timeout=30.0) as c:
r = await c.post("https://api.anthropic.com/v1/messages",
json=payload, headers=headers)
r.raise_for_status()
blocks = r.json().get("content") or []
return "".join(b.get("text", "") for b in blocks if isinstance(b, dict))
except Exception as e:
logger.warning("anthropic cascade tier 3 failed: %s", e)
return ""
async def call_with_cascade(
system: str,
user: str,
min_confidence: float = 0.6,
max_tokens: int = 6000,
) -> dict:
"""Returns {'text': str, 'confidence': float, 'source': str,
'cached': bool}."""
key = _cache_key(system, user)
cached = _cache_get(key)
if cached:
cached["cached"] = True
return cached
input_len = len(user)
# Tier 1: Qwen lokal
text = await _call_ollama(system, user, max_tokens=max_tokens)
conf = _heuristic_confidence(text, input_len)
if text and conf >= min_confidence:
out = {"text": text, "confidence": conf,
"source": "qwen", "cached": False}
_cache_put(key, out)
return out
# Tier 2: OVH 120B
text2 = await _call_ovh(system, user, max_tokens=max_tokens)
conf2 = _heuristic_confidence(text2, input_len)
if text2 and conf2 >= min_confidence:
out = {"text": text2, "confidence": conf2,
"source": "ovh_120b", "cached": False}
_cache_put(key, out)
return out
# Tier 3: Anthropic Claude (Notnagel)
text3 = await _call_anthropic(system, user, max_tokens=max_tokens // 2)
conf3 = _heuristic_confidence(text3, input_len)
if text3 and conf3 >= min_confidence:
out = {"text": text3, "confidence": conf3,
"source": "anthropic_claude", "cached": False}
_cache_put(key, out)
return out
# Nichts hat geliefert — beste Variante wenigstens zurueckgeben
best_text = text or text2 or text3 or ""
best_conf = max(conf, conf2, conf3)
best_source = "qwen" if text else ("ovh_120b" if text2 else "anthropic")
return {"text": best_text, "confidence": best_conf,
"source": best_source, "cached": False,
"below_threshold": True}
@@ -0,0 +1,269 @@
"""
P106 MC-Audit-Type-Klassifizierung.
Zentrales Problem: viele Master-Controls pruefen Sachverhalte, die wir
von Aussen GAR NICHT pruefen koennen z.B. ob das Unternehmen einen
internen Loeschkonzept-Prozess hat oder Schulungen durchgefuehrt wurden.
Bisher: alle MCs deren Pattern im Text nicht matched FAIL.
Folge: GF-Mail mit 95 FAILs, davon ~60-70 in Wirklichkeit nur 'unknown'.
Loesung: pro MC klassifizieren:
* verifiable Pattern muss im sichtbaren Dokument stehen (Audit moeglich)
* process_internal interner Prozess des Kunden (Schulung, AVV-Vertrag, )
* doc_internal interne Dokumentation (VVT-Eintrag, DSFA-File, )
* ambiguous koennte beides sein
In der MC-Auswertung:
* verifiable + Pattern fehlt echtes FAIL
* process_internal CHECK (Hinweis 'Bitte intern pruefen')
* doc_internal CHECK (Hinweis 'Im VVT/DSFA dokumentiert?')
* ambiguous CHECK mit Warnung
"""
from __future__ import annotations
import logging
import re
logger = logging.getLogger(__name__)
# Patterns die auf interne Prozesse hindeuten (NICHT von aussen pruefbar)
_PROCESS_INTERNAL_PATTERNS = [
# Schulung / Mitarbeiter
r"\bmitarbeiter\b.*schul",
r"\bschulung(en)?\b",
r"\bawareness\b",
r"\bsensibilisier",
# Vertraege intern
r"\bauftragsverarbeitungsvertrag\b",
r"\bAVV\b\s+abgeschlossen",
r"\bvertrag.*abgeschlossen",
r"\bdpa\s+(geschlossen|abgeschlossen|vorhanden)",
r"\bSCC\s+(geschlossen|abgeschlossen|implementiert)",
# Technisch-organisatorische Massnahmen (intern)
r"\btechnisch[-\s]*organisatorische\s+ma(ß|ss)nahmen?\b",
r"\bTOM\s+(umgesetzt|dokumentiert|implementiert)",
r"\bverschluesselung\s+(implementiert|aktiv)",
r"\bpseudonymisierung\s+(implementiert|aktiv)",
r"\bbackup[s]?\s+(eingerichtet|vorhanden)",
r"\bzugriffskontrolle",
r"\b(rollen|berechtigungs)konzept",
# Risikobewertung / DSFA (intern)
r"\bdsfa\s+(durchgefuehrt|erstellt|dokumentiert)",
r"\brisikobewertung\s+(durchgefuehrt|dokumentiert)",
r"\brisikoanalyse",
# Loeschkonzept / Aufbewahrung
r"\bloeschkonzept\s+(umgesetzt|implementiert)",
r"\baufbewahrungsfrist(en)?\s+(eingehalten|definiert)",
r"\bloeschroutinen?\s+(aktiv|implementiert)",
# Meldewege / Vorfallmanagement
r"\bmeldepflicht\s+(eingehalten|umgesetzt)",
r"\bvorfallmanagement",
r"\bincident[\s-]?response",
r"\b72[\s-]?stunden[\s-]?meldung",
# Generische Prozess-Indikatoren
r"\bdokumentiert\s+werden",
r"\bbitte\s+(intern\s+)?dokumentieren",
r"\bin\s+der\s+verfahrens",
r"\bnach\s+innen\s+geh",
r"\bausnahmen\s+(dokumentieren|protokollieren)",
r"\bkostenfrei\s+(zur\s+verfuegung|gewaehren|ermoegli)",
r"\bunentgeltlich\s+(zur\s+verfuegung)",
# Vertragsleistung / Service-Level (intern)
r"\bservice[\s-]?level",
r"\breaktionszeit",
# Auditierung / Aufsicht
r"\binterne(s)?\s+audit",
r"\baufsichtsbehoerde\s+gemeldet",
r"\bbeauftragter\s+(intern|benannt)",
# eCall + Branchen-spezifische interne Pflichten
r"\babschaltung\s+der\s+\w+\s+kostenfrei",
r"\bopt[\s-]?out\s+(intern|im\s+kundenportal)\s+ermoeglichen",
]
# Patterns die auf interne Dokumentation hindeuten (VVT, DSFA-Datei, …)
_DOC_INTERNAL_PATTERNS = [
r"\bverzeichnis\s+der\s+verarbeitungstaetigkeiten\b",
r"\bvvt(\s+|\b)",
r"\bdsfa[\s-]?dokument",
r"\bauftragsverarbeitungsverzeichnis",
r"\bsub[\s-]?prozessor[\s-]?liste",
r"\bverarbeitungs[\s-]?register",
r"\binternes\s+register",
r"\baufbewahrungs[\s-]?konzept\b",
]
# Patterns die auf externe Sichtbarkeit hindeuten → DEFINITIV verifiable
_VERIFIABLE_PATTERNS = [
r"\bin\s+der\s+(datenschutzerklaerung|dse|cookie[\s-]?richtlinie|impressum|agb)\b",
r"\bauf\s+der\s+website\s+(genannt|sichtbar|angegeben)",
r"\bim\s+banner\s+(genannt|sichtbar)",
r"\bim\s+cookie[\s-]?banner",
r"\bauf\s+der\s+startseite",
r"\bim\s+footer",
]
def _matches_any(text: str, patterns: list[str]) -> bool:
tl = text.lower()
for pat in patterns:
try:
if re.search(pat, tl):
return True
except re.error:
continue
return False
def classify_mc_audit_type(
title: str | None,
check_question: str | None = None,
fail_criteria: dict | None = None,
) -> str:
"""Returns 'verifiable', 'process_internal', 'doc_internal',
or 'ambiguous'."""
blob = " ".join([title or "", check_question or "",
str(fail_criteria or "")])
if not blob.strip():
return "ambiguous"
is_verifiable_hint = _matches_any(blob, _VERIFIABLE_PATTERNS)
is_process = _matches_any(blob, _PROCESS_INTERNAL_PATTERNS)
is_doc = _matches_any(blob, _DOC_INTERNAL_PATTERNS)
# Wenn explicit Verifiable-Indikator + kein Process → verifiable
if is_verifiable_hint and not (is_process or is_doc):
return "verifiable"
# Wenn Process oder Doc UND nicht Verifiable → intern
if is_process and not is_verifiable_hint:
return "process_internal"
if is_doc and not is_verifiable_hint:
return "doc_internal"
# Beides → ambiguous, im Zweifel CHECK markieren
if is_process or is_doc:
return "ambiguous"
return "verifiable"
def annotate_mc_results(check_results: list[dict]) -> list[dict]:
"""In-place: setzt mc_audit_type auf jeden MC-Check und ersetzt
Status 'failed' durch 'check' wenn audit_type != verifiable."""
if not check_results:
return check_results
n_reclassified = 0
for r in check_results:
if not isinstance(r, dict):
continue
if not (r.get("id") or "").startswith("mc-"):
continue
if "mc_audit_type" not in r:
r["mc_audit_type"] = classify_mc_audit_type(
r.get("label"), r.get("hint"), r.get("fail_criteria"),
)
# Wenn FAIL aber audit_type != verifiable → "check" (manuell)
if (not r.get("passed")
and not r.get("skipped")
and r["mc_audit_type"] in (
"process_internal", "doc_internal", "ambiguous",
)):
r["audit_status"] = "check" # NICHT failed
n_reclassified += 1
elif r.get("passed"):
r["audit_status"] = "pass"
elif r.get("skipped"):
r["audit_status"] = "skip"
else:
r["audit_status"] = "fail"
if n_reclassified:
logger.info(
"MC-Audit-Type: %d/%d MCs reklassifiziert von FAIL → CHECK "
"(interne Pruefung erforderlich)",
n_reclassified, len(check_results),
)
return check_results
def split_by_audit_type(check_results: list[dict]) -> dict[str, list[dict]]:
"""Liefert {verifiable_fails, internal_checks, passes, skips}."""
out = {"verifiable_fails": [], "internal_checks": [],
"passes": [], "skips": []}
for r in (check_results or []):
if not isinstance(r, dict):
continue
if not (r.get("id") or "").startswith("mc-"):
continue
status = r.get("audit_status")
if status == "pass":
out["passes"].append(r)
elif status == "skip":
out["skips"].append(r)
elif status == "check":
out["internal_checks"].append(r)
elif status == "fail" or (not r.get("passed") and not r.get("skipped")):
out["verifiable_fails"].append(r)
return out
def build_internal_checks_block_html(
internal_checks: list[dict],
limit: int = 30,
) -> str:
if not internal_checks:
return ""
by_type: dict[str, list[dict]] = {}
for c in internal_checks:
t = c.get("mc_audit_type", "ambiguous")
by_type.setdefault(t, []).append(c)
sections: list[str] = []
labels = {
"process_internal": ("Interne Prozesse — bitte beim DSB pruefen",
"#1e40af"),
"doc_internal": ("Interne Dokumentation — bitte im VVT/DSFA pruefen",
"#5b21b6"),
"ambiguous": ("Unklar ob Audit-Befund oder interne Pruefung",
"#92400e"),
}
for atype, (heading, color) in labels.items():
items = by_type.get(atype) or []
if not items:
continue
rows = "".join(
f'<li style="margin-bottom:4px;font-size:11px;line-height:1.45">'
f'<strong>{(c.get("label") or "")[:160]}</strong>'
+ (f' <span style="color:#94a3b8">({c.get("regulation") or ""})</span>'
if c.get("regulation") else '') +
f'</li>'
for c in items[:limit]
)
sections.append(
f'<div style="margin-bottom:10px">'
f'<div style="font-size:11px;color:{color};text-transform:uppercase;'
f'letter-spacing:1px;font-weight:600;margin-bottom:4px">'
f'{heading} ({len(items)})</div>'
f'<ul style="margin:0 0 0 18px;padding:0">{rows}</ul>'
f'</div>'
)
return (
'<div style="font-family:-apple-system,BlinkMacSystemFont,sans-serif;'
'max-width:760px;margin:0 auto 16px;padding:12px 16px;'
'background:#f0f9ff;border:1px solid #bfdbfe;border-radius:8px">'
'<div style="font-size:11px;color:#1e40af;text-transform:uppercase;'
'letter-spacing:1.2px;margin-bottom:4px;font-weight:600">'
'Pruefungen die wir von aussen NICHT durchfuehren koennen</div>'
f'<h3 style="margin:0 0 6px;font-size:14px;color:#1e293b">'
f'{len(internal_checks)} Pruefpunkt'
f'{"e" if len(internal_checks) != 1 else ""} sind '
'NUR intern beim Kunden zu pruefen</h3>'
'<p style="margin:0 0 10px;font-size:11px;color:#475569;'
'line-height:1.5">'
'Diese Anforderungen koennen wir per externem Website-Audit nicht '
'als erfuellt oder nicht-erfuellt bewerten — sie betreffen interne '
'Prozesse (Schulungen, AVV-Vertraege, TOM-Doku) oder interne '
'Dokumentation (VVT, DSFA, Loeschkonzept). Sie sind also <strong>kein '
'Verstoss</strong>, sondern Hinweis-Checks fuer Ihren DSB.</p>'
+ "".join(sections) +
'</div>'
)
@@ -61,6 +61,12 @@ def build_scorecard(check_results: list[dict]) -> dict:
b["skipped"] += 1
elif r.get("passed"):
b["passed"] += 1
# P106 — interner Check ist KEIN Fail (zaehlt als skipped fuer
# die Score-Berechnung damit der Score realistisch ist).
elif r.get("audit_status") == "check":
b["skipped"] += 1
b.setdefault("internal_checks", 0)
b["internal_checks"] += 1
else:
b["failed"] += 1
sev = (r.get("severity") or "MEDIUM").upper()
@@ -0,0 +1,173 @@
"""
P68 Reverse-Audit: eigene Templates gegen alle MCs pruefen.
Statt 'gegeben einen Kunden-Text → welche MCs fail' machen wir den
umgekehrten Test: 'gegeben unseren BreakPilot-Standard-Template-Pool
(95 Templates) welche MCs werden NICHT abgedeckt? Wo sind Luecken?'
Liefert einen Coverage-Report:
- Total MCs in DB: ~1800
- MCs abgedeckt durch min. 1 unserer Templates: X
- MCs ohne Coverage: Y (Liste)
- Templates ohne MC-Wirkung: Z (Liste)
Zweck: Audit unserer eigenen Code-Base. Wenn ein Customer einen Lauf
macht und 50 Findings produziert sind, sollten 90%+ davon durch unsere
Template-Bibliothek korrigierbar sein. Wenn nicht Templates fehlen.
"""
from __future__ import annotations
import logging
import re
from sqlalchemy import text as sa_text
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
def run_reverse_audit(db: Session) -> dict:
"""Hauptfunktion. Returns coverage-report dict."""
# 1) Alle MCs aus doc_check_controls laden
mc_rows = db.execute(sa_text(
"""
SELECT id::text, control_id, doc_type, title, check_question,
pass_criteria, severity
FROM compliance.doc_check_controls
ORDER BY doc_type, severity DESC
"""
)).fetchall()
# 2) Templates aus DB (doc_templates oder legal_templates oder analog)
try:
tpl_rows = db.execute(sa_text(
"""
SELECT id::text, doc_type, title, body
FROM compliance.doc_templates
WHERE active = TRUE
"""
)).fetchall()
except Exception:
# Fallback auf evtl. andere Template-Tabelle
try:
tpl_rows = db.execute(sa_text(
"""
SELECT id::text, doc_type, name AS title, content AS body
FROM compliance.legal_templates
"""
)).fetchall()
except Exception as e:
logger.warning("template table not found: %s", e)
tpl_rows = []
# 3) Coverage-Matrix: pro MC, ob ein Template sie abdeckt
templates_by_doctype: dict[str, list[dict]] = {}
for tid, dt, title, body in tpl_rows:
templates_by_doctype.setdefault(dt or "other", []).append({
"id": tid, "title": title, "body": (body or "")[:50000],
})
covered_mc_ids: set[str] = set()
uncovered: list[dict] = []
for mc_id, ctrl_id, dt, title, q, pc, sev in mc_rows:
tpls = templates_by_doctype.get(dt or "other") or []
if not tpls:
uncovered.append({
"mc_id": ctrl_id, "doc_type": dt, "title": title,
"severity": sev, "reason": "no_template_for_doctype",
})
continue
# Heuristik: pass_criteria sind Pattern. Wenn IRGENDEIN Template
# die Pattern enthaelt → covered.
criteria = _extract_patterns_from_pc(pc)
if not criteria:
# ohne klare Pattern: per Title-Keywords pruefen
criteria = _title_keywords(title or "")
ok = False
for tpl in tpls:
body = tpl["body"].lower()
hits = sum(1 for p in criteria if p and p.lower() in body)
if hits >= max(1, len(criteria) // 2):
ok = True
break
if ok:
covered_mc_ids.add(mc_id)
else:
uncovered.append({
"mc_id": ctrl_id, "doc_type": dt, "title": title,
"severity": sev, "reason": "no_template_match",
"criteria_sample": criteria[:5],
})
# 4) Templates ohne MC-Wirkung
used_template_ids: set[str] = set()
for mc_id, ctrl_id, dt, title, q, pc, sev in mc_rows:
if mc_id not in covered_mc_ids:
continue
tpls = templates_by_doctype.get(dt or "other") or []
criteria = _extract_patterns_from_pc(pc) or _title_keywords(title or "")
for tpl in tpls:
body = tpl["body"].lower()
hits = sum(1 for p in criteria if p and p.lower() in body)
if hits >= max(1, len(criteria) // 2):
used_template_ids.add(tpl["id"])
break
all_template_ids = {t["id"] for tpls in templates_by_doctype.values()
for t in tpls}
unused_templates = all_template_ids - used_template_ids
return {
"total_mcs": len(mc_rows),
"total_templates": len(all_template_ids),
"covered_mcs": len(covered_mc_ids),
"uncovered_mcs": len(uncovered),
"coverage_pct": round(len(covered_mc_ids) / max(1, len(mc_rows)) * 100, 1),
"unused_templates": sorted(unused_templates),
"top_uncovered_high": [u for u in uncovered if u.get("severity") == "HIGH"][:30],
"by_doctype": _summarize_by_doctype(mc_rows, covered_mc_ids),
}
def _extract_patterns_from_pc(pc) -> list[str]:
"""pc ist jsonb mit z.B. {required_phrases: [...]}, {keywords: [...]}"""
if not pc:
return []
if isinstance(pc, str):
try:
import json as _j
pc = _j.loads(pc)
except Exception:
return [pc[:50]]
if isinstance(pc, dict):
out: list[str] = []
for k in ("required_phrases", "keywords", "must_contain",
"patterns", "phrases"):
v = pc.get(k)
if isinstance(v, list):
out.extend([str(x)[:80] for x in v if x])
return out
if isinstance(pc, list):
return [str(x)[:80] for x in pc if x]
return []
def _title_keywords(title: str) -> list[str]:
"""Fallback wenn pass_criteria leer: extrahiere Substantive aus Title."""
if not title:
return []
# primitive: alle Worte > 4 Buchstaben
return [w for w in re.findall(r"\b\w{5,}\b", title)][:5]
def _summarize_by_doctype(mc_rows, covered_mc_ids: set[str]) -> dict:
out: dict[str, dict] = {}
for mc_id, ctrl_id, dt, title, q, pc, sev in mc_rows:
dt = dt or "other"
d = out.setdefault(dt, {"total": 0, "covered": 0})
d["total"] += 1
if mc_id in covered_mc_ids:
d["covered"] += 1
for dt, d in out.items():
d["pct"] = round(d["covered"] / max(1, d["total"]) * 100, 1)
return out
+4
View File
@@ -28,4 +28,8 @@ USER appuser
EXPOSE 8094
# P83 — Build-SHA fuer check-rebuild-needed.sh
ARG BUILD_SHA="unknown"
ENV BUILD_SHA=${BUILD_SHA}
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8094"]
+98 -1
View File
@@ -256,7 +256,8 @@ async def archive_legal_document(
}
@router.get("/documents/{cid}/history")
@router.get("/api/v1/documents/{cid}/history")
@router.get("/documents/{cid}/history") # legacy path, kept for backwards compatibility
async def get_document_history(cid: str):
"""Follow the parent_cid chain to reconstruct version history."""
history = []
@@ -285,3 +286,99 @@ async def get_document_history(cid: str):
break
return {"cid": cid, "history": history, "depth": len(history)}
@router.get("/api/v1/documents/{cid_a}/diff/{cid_b}")
async def diff_documents(cid_a: str, cid_b: str):
"""
Compare two DSMS document versions by their CIDs.
Returns a unified diff of the textual content when both documents are
text-decodable (UTF-8). For binary documents the response indicates
"binary" and returns just the metadata differences. Used by the Audit
Timeline UI to render "what changed between V2 and V3 of CE-Akte X".
"""
try:
raw_a = await ipfs_cat(cid_a)
raw_b = await ipfs_cat(cid_b)
except Exception as exc:
return {"error": f"could not fetch one of the CIDs: {exc}", "cid_a": cid_a, "cid_b": cid_b}
try:
pkg_a = json.loads(raw_a)
pkg_b = json.loads(raw_b)
except Exception:
# Documents are not the wrapped-package JSON shape — treat as raw.
pkg_a = {"metadata": {}, "content_base64": ""}
pkg_b = {"metadata": {}, "content_base64": ""}
meta_a = pkg_a.get("metadata", {}) or {}
meta_b = pkg_b.get("metadata", {}) or {}
meta_diff = _diff_metadata(meta_a, meta_b)
# Try to decode the content. The Archive flow stores files as base64 in
# `content_base64`; older payloads may use `content` (utf-8 text).
text_a, text_b, is_binary = _extract_texts(pkg_a, pkg_b)
if is_binary:
return {
"cid_a": cid_a,
"cid_b": cid_b,
"kind": "binary",
"metadata_diff": meta_diff,
"note": "Binary payload — text diff omitted. Compare via the rendered tech-file export instead.",
}
diff_lines = list(
_unified_diff(text_a.splitlines(), text_b.splitlines(), fromfile=cid_a, tofile=cid_b, lineterm="")
)
return {
"cid_a": cid_a,
"cid_b": cid_b,
"kind": "text",
"metadata_diff": meta_diff,
"diff": "\n".join(diff_lines),
"added_lines": sum(1 for ln in diff_lines if ln.startswith("+") and not ln.startswith("+++")),
"removed_lines": sum(1 for ln in diff_lines if ln.startswith("-") and not ln.startswith("---")),
}
def _diff_metadata(a: dict, b: dict) -> dict:
"""Return per-field change list: {field: {"old": ..., "new": ...}}."""
keys = set(a.keys()) | set(b.keys())
changes = {}
for k in sorted(keys):
if a.get(k) != b.get(k):
changes[k] = {"old": a.get(k), "new": b.get(k)}
return changes
def _extract_texts(pkg_a: dict, pkg_b: dict) -> tuple[str, str, bool]:
"""Return (text_a, text_b, is_binary). Falls back to base64-decode."""
import base64
def to_text(pkg: dict) -> tuple[str, bool]:
if isinstance(pkg.get("content"), str):
return pkg["content"], False
b64 = pkg.get("content_base64")
if not b64:
return "", False
try:
raw = base64.b64decode(b64)
except Exception:
return "", True
try:
return raw.decode("utf-8"), False
except UnicodeDecodeError:
return "", True
text_a, bin_a = to_text(pkg_a)
text_b, bin_b = to_text(pkg_b)
return text_a, text_b, (bin_a or bin_b)
def _unified_diff(a, b, fromfile, tofile, lineterm):
"""Tiny shim around difflib.unified_diff so the function reads cleanly."""
import difflib
return difflib.unified_diff(a, b, fromfile=fromfile, tofile=tofile, lineterm=lineterm, n=2)
+108
View File
@@ -0,0 +1,108 @@
"""
Tests for the version-chain diff endpoint added in DSMS Stufe 3.
Mocks ipfs_cat so the test does not require a running IPFS node.
"""
import base64
import json
from unittest.mock import AsyncMock, patch
import pytest
from fastapi.testclient import TestClient
from main import app
client = TestClient(app)
def _wrap(metadata: dict, content_text: str) -> str:
"""Mimic the JSON envelope that routers.documents.ipfs_cat returns."""
return json.dumps(
{
"metadata": metadata,
"content_base64": base64.b64encode(content_text.encode("utf-8")).decode("ascii"),
}
)
@pytest.mark.asyncio
async def test_diff_text_documents_returns_unified_diff():
pkg_a = _wrap({"version": "1", "document_type": "ce_techfile"}, "alpha\nbeta\ngamma\n")
pkg_b = _wrap({"version": "2", "document_type": "ce_techfile"}, "alpha\nDELTA\ngamma\n")
async def fake_cat(cid: str):
return pkg_a if cid == "cidA" else pkg_b
with patch("routers.documents.ipfs_cat", new=AsyncMock(side_effect=fake_cat)):
resp = client.get("/api/v1/documents/cidA/diff/cidB")
assert resp.status_code == 200
body = resp.json()
assert body["kind"] == "text"
assert body["cid_a"] == "cidA"
assert body["cid_b"] == "cidB"
assert body["added_lines"] >= 1
assert body["removed_lines"] >= 1
assert "DELTA" in body["diff"]
assert body["metadata_diff"] == {"version": {"old": "1", "new": "2"}}
@pytest.mark.asyncio
async def test_diff_binary_documents_returns_metadata_only():
# Use raw bytes that are not utf-8 decodable
invalid_utf8 = b"\xff\xfe\xfd\xfc"
pkg_a = json.dumps(
{"metadata": {"version": "1"}, "content_base64": base64.b64encode(invalid_utf8).decode()}
)
pkg_b = json.dumps(
{"metadata": {"version": "2"}, "content_base64": base64.b64encode(invalid_utf8 + b"\x00").decode()}
)
async def fake_cat(cid: str):
return pkg_a if cid == "cidA" else pkg_b
with patch("routers.documents.ipfs_cat", new=AsyncMock(side_effect=fake_cat)):
resp = client.get("/api/v1/documents/cidA/diff/cidB")
assert resp.status_code == 200
body = resp.json()
assert body["kind"] == "binary"
assert body["metadata_diff"] == {"version": {"old": "1", "new": "2"}}
@pytest.mark.asyncio
async def test_diff_handles_fetch_error():
async def fake_cat(cid: str):
raise RuntimeError("not pinned")
with patch("routers.documents.ipfs_cat", new=AsyncMock(side_effect=fake_cat)):
resp = client.get("/api/v1/documents/cidA/diff/cidB")
assert resp.status_code == 200
body = resp.json()
assert "error" in body
assert body["cid_a"] == "cidA"
assert body["cid_b"] == "cidB"
@pytest.mark.asyncio
async def test_history_endpoint_follows_parent_chain():
"""Sanity check that the existing history endpoint still works after route alias."""
chain = {
"v3": _wrap({"version": "3", "parent_cid": "v2"}, "x"),
"v2": _wrap({"version": "2", "parent_cid": "v1"}, "x"),
"v1": _wrap({"version": "1", "parent_cid": None}, "x"),
}
async def fake_cat(cid: str):
return chain[cid]
with patch("routers.documents.ipfs_cat", new=AsyncMock(side_effect=fake_cat)):
resp = client.get("/api/v1/documents/v3/history")
assert resp.status_code == 200
body = resp.json()
assert body["depth"] == 3
assert [h["version"] for h in body["history"]] == ["3", "2", "1"]