Compare commits
14 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d21e1247c9 | |||
| e1b270c36e | |||
| 48e39423e6 | |||
| 31222885b3 | |||
| 188bb787d2 | |||
| d9d04deb00 | |||
| 2645b5b043 | |||
| 6b7950f428 | |||
| ba6f1bd1f6 | |||
| c1ea9458a7 | |||
| 0631a98bdd | |||
| c3542f7dfe | |||
| 7ec29999a2 | |||
| 402a42d30d |
@@ -128,5 +128,51 @@ func GetWarewashingPatterns() []HazardPattern {
|
||||
ISO12100Section: "6.3.5.6",
|
||||
DefaultSeverity: 2, DefaultExposure: 3,
|
||||
},
|
||||
{
|
||||
ID: "HP2207", NameDE: "Rueckfluss / Kontamination des Trinkwassers", NameEN: "Backflow / potable-water contamination",
|
||||
RequiredComponentTags: []string{"dom_warewashing", "backflow_risk"},
|
||||
GeneratedHazardCats: []string{"material_environmental"},
|
||||
SuggestedMeasureIDs: []string{"M2209"},
|
||||
Priority: 84,
|
||||
ApplicableLifecycles: []string{"normal_operation"},
|
||||
ScenarioDE: "Verschmutztes Spuel- oder Chemiewasser wird ueber den Frischwasseranschluss in das Trinkwassernetz zurueckgesaugt und kontaminiert es (Ruecksaugen bei Unterdruck im Netz).",
|
||||
TriggerDE: "Fehlender oder defekter Rueckflussverhinderer/Systemtrenner; Unterdruck im Trinkwassernetz; kein freier Auslauf.",
|
||||
HarmDE: "Gesundheitsgefaehrdung Dritter durch kontaminiertes Trinkwasser (Chemie, Keime).",
|
||||
AffectedDE: "Verbraucher am selben Trinkwassernetz, Betreiber",
|
||||
ZoneDE: "Frischwasseranschluss, Wasserzulauf",
|
||||
ISO12100Section: "6.2.4",
|
||||
DefaultSeverity: 3, DefaultExposure: 2,
|
||||
},
|
||||
{
|
||||
ID: "HP2208", NameDE: "Schnittverletzung an scharfen Kanten/Sieben", NameEN: "Cut injury on sharp edges/screens",
|
||||
RequiredComponentTags: []string{"dom_warewashing", "sharp_edge"},
|
||||
GeneratedHazardCats: []string{"mechanical_hazard"},
|
||||
SuggestedMeasureIDs: []string{"M003"},
|
||||
Priority: 74,
|
||||
ApplicableLifecycles: []string{"cleaning", "maintenance"},
|
||||
ScenarioDE: "Schneiden an scharfen Blechkanten, Sieben oder dem Ablaufpumpen-Laufrad beim Reinigen oder Eingreifen in die Spuelkammer.",
|
||||
TriggerDE: "Entnehmen/Reinigen der Siebe; Eingreifen an scharfen Kanten ohne Schutzhandschuhe.",
|
||||
HarmDE: "Schnittwunden an Haenden und Fingern.",
|
||||
AffectedDE: "Reinigungspersonal, Bedienpersonal",
|
||||
ZoneDE: "Zugaengliche Kanten, Siebe, Spuelkammer, Ablaufpumpe",
|
||||
ISO12100Section: "6.2.2.1",
|
||||
DefaultSeverity: 1, DefaultExposure: 3,
|
||||
},
|
||||
{
|
||||
ID: "HP2209", NameDE: "Unerwarteter Wiederanlauf bei Wartung/Reinigung", NameEN: "Unexpected restart during maintenance/cleaning",
|
||||
RequiredComponentTags: []string{"dom_warewashing", "programmable"},
|
||||
RequiredLifecycles: []string{"maintenance", "cleaning", "fault_clearing"},
|
||||
GeneratedHazardCats: []string{"safety_function_failure"},
|
||||
SuggestedMeasureIDs: []string{"M042"},
|
||||
Priority: 80,
|
||||
ApplicableLifecycles: []string{"maintenance", "cleaning"},
|
||||
ScenarioDE: "Waehrend Wartung oder Reinigung laeuft die Maschine durch fehlende Freischaltung (LOTO) oder automatischen Wiederanlauf unerwartet an (Pumpe, Spuelgang).",
|
||||
TriggerDE: "Kein Freischalten/Sichern gegen Wiedereinschalten; automatischer Wiederanlauf nach Netzunterbrechung.",
|
||||
HarmDE: "Verbruehung, Quetschen oder elektrischer Schlag durch unerwartet anlaufende Maschine.",
|
||||
AffectedDE: "Wartungspersonal, Reinigungspersonal",
|
||||
ZoneDE: "Gesamte Maschine, Pumpe, Antriebe",
|
||||
ISO12100Section: "6.2.11.4",
|
||||
DefaultSeverity: 3, DefaultExposure: 2,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -107,6 +107,9 @@ func GetKeywordDictionary() []KeywordEntry {
|
||||
// honest (generic hygiene; surfaced by the warewashing GT).
|
||||
{Keywords: []string{"spuelarm", "spuelfeld", "wascharm", "spruehfeld"}, ExtraTags: []string{"rotating_part"}},
|
||||
{Keywords: []string{"spuelkammer", "spueltuer", "geraetetuer", "haubentuer", "klapptuer"}, ExtraTags: []string{"access_door"}},
|
||||
// Frischwasseranschluss an das Trinkwassernetz -> Rueckfluss/Ruecksaug-Risiko (EN 1717).
|
||||
{Keywords: []string{"rueckfluss", "rueckflussverhinderer", "ruecksaug", "trinkwasser", "frischwasseranschluss", "systemtrenner"}, ExtraTags: []string{"backflow_risk"}},
|
||||
{Keywords: []string{"scharfe kante", "scharfkant", "blechkante", "scharfe blechkante", "sieb", "siebe"}, ExtraTags: []string{"sharp_edge"}},
|
||||
// Ghost-Closure (Emit-Seite): macht die 34 toten Required-Tags
|
||||
// emittierbar, jeweils NUR via domaenenspezifische Keywords -> die 120
|
||||
// Ghost-Patterns feuern wieder, aber nur fuer ihre echte Maschine (kein
|
||||
|
||||
@@ -65,5 +65,11 @@ func getWarewashingMeasures() []ProtectiveMeasureEntry {
|
||||
HazardCategory: "general",
|
||||
Examples: []string{"Warnpiktogramm 'Heisser Dampf' an der Tuer", "BA-Hinweis 'Tuer nach Programmende langsam oeffnen'"},
|
||||
NormReferences: []string{"ISO 7010", "EN 60335-2-58"}},
|
||||
{ID: "M2209", ReductionType: "design", SubType: "containment",
|
||||
Name: "Rueckflussverhinderer / Systemtrenner nach EN 1717",
|
||||
Description: "Der Frischwasseranschluss ist durch einen Rueckflussverhinderer bzw. Systemtrenner der passenden Schutzklasse oder durch einen freien Auslauf gegen Ruecksaugen verschmutzten Wassers in das Trinkwassernetz gesichert.",
|
||||
HazardCategory: "material_environmental",
|
||||
Examples: []string{"Systemtrenner Typ BA nach EN 1717", "Freier Auslauf Typ AB ueber dem hoechsten Wasserstand"},
|
||||
NormReferences: []string{"EN 1717", "EN 60335-2-58"}},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,6 +40,14 @@ func classifyAuthority(r LegalSearchResult) authorityInfo {
|
||||
if jur == "" {
|
||||
jur = inferJurisdiction(r)
|
||||
}
|
||||
hay := r.ArticleLabel + " " + r.RegulationShort + " " + r.RegulationName + " " + r.RegulationCode
|
||||
// A recognised standard NAME (NIST/OWASP/ISO 27001/CIS/CSA CCM/Grundschutz) is authoritative
|
||||
// even when the corpus mis-tagged the chunk as supervisory_guidance (weight 70) — many
|
||||
// standards were ingested with a generic guidance source_class. The name wins, so they
|
||||
// classify (and rank) as technical_standard / control_standard. binding_law is preserved.
|
||||
if r.SourceClass != "binding_law" && containsAny(hay, standardMarkers) {
|
||||
return authorityInfo{weight: 80, sourceClass: "technical_standard", jurisdiction: jur}
|
||||
}
|
||||
if r.SourceClass != "" {
|
||||
w := r.AuthorityWeight
|
||||
if w == 0 && r.SourceClass == "binding_law" {
|
||||
@@ -50,7 +58,6 @@ func classifyAuthority(r LegalSearchResult) authorityInfo {
|
||||
if r.AuthorityWeight > 0 {
|
||||
return authorityInfo{weight: r.AuthorityWeight, sourceClass: sourceClassFromWeight(r.AuthorityWeight), jurisdiction: jur}
|
||||
}
|
||||
hay := r.ArticleLabel + " " + r.RegulationShort + " " + r.RegulationName + " " + r.RegulationCode
|
||||
switch {
|
||||
case containsAny(hay, foreignMarkers):
|
||||
return authorityInfo{weight: 0, sourceClass: "foreign_law", jurisdiction: "CH"}
|
||||
|
||||
@@ -15,6 +15,7 @@ func TestClassifyAuthority(t *testing.T) {
|
||||
{"tagged foreign CH", LegalSearchResult{AuthorityWeight: 0, SourceClass: "foreign_law", Jurisdiction: "CH"}, 0, "foreign_law", "CH"},
|
||||
{"untagged ENISA guidance", LegalSearchResult{RegulationShort: "ENISA", ArticleLabel: "ENISA CRA Standards Mapping"}, 70, "supervisory_guidance", "EU"},
|
||||
{"untagged NIST standard", LegalSearchResult{RegulationShort: "NIST SP 800-82r3", ArticleLabel: "AU-8"}, 80, "technical_standard", "EU"},
|
||||
{"mis-tagged NIST guidance -> standard by name", LegalSearchResult{SourceClass: "supervisory_guidance", AuthorityWeight: 70, RegulationShort: "NIST SP 800-82r3", ArticleLabel: "NIST SP 800-82r3"}, 80, "technical_standard", "EU"},
|
||||
{"BSI Grundschutz standard beats BSI guidance", LegalSearchResult{RegulationShort: "BSI Grundschutz", ArticleLabel: "BSI Grundschutz Baustein"}, 80, "technical_standard", "DE"},
|
||||
{"weight-only 85 TRGS standard", LegalSearchResult{AuthorityWeight: 85, RegulationShort: "TRGS 529"}, 85, "technical_standard", "EU"},
|
||||
{"tagged technical_standard", LegalSearchResult{AuthorityWeight: 80, SourceClass: "technical_standard", Jurisdiction: "EU"}, 80, "technical_standard", "EU"},
|
||||
|
||||
@@ -121,3 +121,54 @@ func controlRoleOf(payload map[string]interface{}) string {
|
||||
IsRecital: getBool(payload, "is_recital"),
|
||||
})
|
||||
}
|
||||
|
||||
// ensureControlDiversity guarantees that the returned top-K of a control question surfaces at
|
||||
// least one operational_requirement and one control_standard WHEN the pool contains them —
|
||||
// without forcing them to Top-1. implementation_guidance (e.g. ENISA good practices) keeps its
|
||||
// earned semantic lead; the rule only promotes the best hit of a missing control role into the
|
||||
// top-K by overwriting the lowest-ranked redundant guidance slot. So an implementation question
|
||||
// shows the relevant source ROLES (binding requirement + standard + guidance) side by side
|
||||
// instead of one role flooding the list. The promoted hit's original (now duplicate) position
|
||||
// stays in the tail and is dropped by the caller's truncation to topK.
|
||||
func ensureControlDiversity(results []LegalSearchResult, topK int) []LegalSearchResult {
|
||||
if topK <= 0 || topK >= len(results) {
|
||||
return results // everything is already returned — nothing to promote
|
||||
}
|
||||
roleAt := make([]string, len(results))
|
||||
for i := range results {
|
||||
roleAt[i] = classifyRole(results[i])
|
||||
}
|
||||
present := make(map[string]bool, topK)
|
||||
for i := 0; i < topK; i++ {
|
||||
present[roleAt[i]] = true
|
||||
}
|
||||
for _, want := range []string{roleOperationalReq, roleControlStandard} {
|
||||
if present[want] {
|
||||
continue
|
||||
}
|
||||
src := -1
|
||||
for i := topK; i < len(results); i++ {
|
||||
if roleAt[i] == want {
|
||||
src = i
|
||||
break
|
||||
}
|
||||
}
|
||||
if src < 0 {
|
||||
continue // role absent from the whole pool — nothing to promote
|
||||
}
|
||||
dst := -1
|
||||
for j := topK - 1; j >= 0; j-- {
|
||||
if roleAt[j] == roleImplGuidance {
|
||||
dst = j
|
||||
break
|
||||
}
|
||||
}
|
||||
if dst < 0 {
|
||||
continue // no redundant guidance to sacrifice — leave the head untouched
|
||||
}
|
||||
results[dst] = results[src]
|
||||
roleAt[dst] = want
|
||||
present[want] = true
|
||||
}
|
||||
return results
|
||||
}
|
||||
|
||||
@@ -77,3 +77,58 @@ func TestControlRoleOf_Payload(t *testing.T) {
|
||||
t.Errorf("DORA abstract article role = %q must be excluded from the control-pool", got)
|
||||
}
|
||||
}
|
||||
|
||||
func headHasRole(head []LegalSearchResult, role string) bool {
|
||||
for _, r := range head {
|
||||
if classifyRole(r) == role {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func TestEnsureControlDiversity(t *testing.T) {
|
||||
ig := func(n string) LegalSearchResult {
|
||||
return LegalSearchResult{RegulationShort: "ENISA " + n + " Good Practices"}
|
||||
}
|
||||
opReq := LegalSearchResult{RegulationShort: "CRA", ArticleLabel: "CRA Anhang I", Category: "regulation"}
|
||||
std := LegalSearchResult{RegulationShort: "NIST SP 800-53"}
|
||||
|
||||
t.Run("injects missing op_req + control_standard, guidance keeps Top-1", func(t *testing.T) {
|
||||
out := ensureControlDiversity([]LegalSearchResult{ig("A"), ig("B"), ig("C"), std, opReq}, 3)
|
||||
head := out[:3]
|
||||
if classifyRole(head[0]) != roleImplGuidance {
|
||||
t.Errorf("Top-1 should stay implementation_guidance, got %q", classifyRole(head[0]))
|
||||
}
|
||||
if !headHasRole(head, roleOperationalReq) {
|
||||
t.Error("top-K must contain an operational_requirement after diversity")
|
||||
}
|
||||
if !headHasRole(head, roleControlStandard) {
|
||||
t.Error("top-K must contain a control_standard after diversity")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("no-op when both roles already present", func(t *testing.T) {
|
||||
out := ensureControlDiversity([]LegalSearchResult{opReq, std, ig("A"), ig("B")}, 3)
|
||||
if classifyRole(out[0]) != roleOperationalReq || classifyRole(out[1]) != roleControlStandard {
|
||||
t.Error("already-diverse top-K must be left untouched")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("absent role is not forced (no panic)", func(t *testing.T) {
|
||||
out := ensureControlDiversity([]LegalSearchResult{ig("A"), ig("B"), ig("C"), std}, 3)
|
||||
if !headHasRole(out[:3], roleControlStandard) {
|
||||
t.Error("present control_standard should be injected")
|
||||
}
|
||||
if headHasRole(out[:3], roleOperationalReq) {
|
||||
t.Error("operational_requirement absent from the pool must NOT appear")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("topK covering the whole pool is unchanged", func(t *testing.T) {
|
||||
out := ensureControlDiversity([]LegalSearchResult{ig("A"), opReq}, 5)
|
||||
if len(out) != 2 || classifyRole(out[0]) != roleImplGuidance {
|
||||
t.Error("topK >= len must return results unchanged")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -166,6 +166,15 @@ func (c *LegalRAGClient) searchInternal(ctx context.Context, collection string,
|
||||
// Response-Schema unveraendert. Score traegt den Authority-Score, damit nachgelagerte
|
||||
// Multi-Collection-Merges (Advisor) die Ordnung bewahren.
|
||||
results = rerankByAuthority(query, results)
|
||||
|
||||
// Control-Diversity: auf einer Umsetzungsfrage darf impl_guidance (ENISA) Top-1 bleiben,
|
||||
// aber die Top-K soll mindestens eine binding operational_requirement (CRA Anhang I) und
|
||||
// einen control_standard (NIST/ISO) zeigen, falls im Pool — Quellenarten sichtbar machen
|
||||
// statt sie kuenstlich auf Top-1 zu heben. Nur Reihenfolge, vor der Truncation.
|
||||
if queryWantsControls(query) {
|
||||
results = ensureControlDiversity(results, topK)
|
||||
}
|
||||
|
||||
if topK > 0 && len(results) > topK {
|
||||
results = results[:topK]
|
||||
}
|
||||
|
||||
@@ -0,0 +1,179 @@
|
||||
"""Obligation Aggregation Engine — Ausführung des Legal Obligation Layer v1.
|
||||
|
||||
Aggregiert Bewertungen auf KRITERIUM-Ebene (pro Control) zu Ergebnissen auf
|
||||
OBLIGATION-Ebene. Das ist die erstmalige Ausführung des Modells
|
||||
|
||||
Regulation → Legal Obligation → Control → Criterion
|
||||
|
||||
— das Finding entsteht auf der OBLIGATION, nicht pro Control. Damit kollabiert
|
||||
die im Katalog gemessene Redundanz (portability 11×, recipients 14×): N Controls,
|
||||
die dieselbe Pflicht prüfen, ergeben EIN Obligation-Finding statt N Control-Findings.
|
||||
|
||||
Regulierungs-agnostisch: kennt nur obligation_id, tier, met, legal_basis,
|
||||
conditional. DSGVO/CRA/NIS2/DORA/MaschVO/AI-Act speisen dieselbe Funktion.
|
||||
|
||||
Fail-safe (docs-src/development/legal_obligation_layer_v1.md, §Aggregation):
|
||||
LEGAL_MINIMUM-Obligation:
|
||||
applicable=false → NA (kein Finding)
|
||||
keine LM-Anforderung erfüllt → FAILED (Pflicht-Lücke)
|
||||
alle LM-Anforderungen erfüllt → MET
|
||||
nur ein Teil erfüllt → PARTIAL
|
||||
LM nicht bewertbar (Prüfer down) → UNDETERMINED (Aufrufer behält Legacy)
|
||||
BEST_PRACTICE/OPTIONAL-Obligation (kein LM):
|
||||
mind. ein Kriterium erfüllt → MET (abgedeckt)
|
||||
keines → OPEN (nur Empfehlung, NIE FAILED)
|
||||
|
||||
Redundanz-Kollaps: LM-Kriterien EINER Obligation werden zu „Anforderungen" nach
|
||||
`legal_basis` gruppiert; eine Anforderung gilt als erfüllt, sobald IRGENDEIN Control
|
||||
sie bestätigt (OR). 9× recipients_disclosed (alle Art 13(1)(e)) = eine Anforderung.
|
||||
PARTIAL entsteht nur bei mehreren DISTINKTEN LM-Anforderungen (verschiedene
|
||||
legal_basis) innerhalb einer Obligation.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import Counter, defaultdict
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Callable, Optional
|
||||
|
||||
LM, BP, OPT = "LEGAL_MINIMUM", "BEST_PRACTICE", "OPTIONAL"
|
||||
MET, PARTIAL, FAILED = "MET", "PARTIAL", "FAILED"
|
||||
NA, UNDETERMINED, OPEN = "NA", "UNDETERMINED", "OPEN"
|
||||
PFLICHT, EMPFEHLUNG, NICHT_ANWENDBAR = "PFLICHT", "EMPFEHLUNG", "NICHT_ANWENDBAR"
|
||||
|
||||
# Predikat-Hook: (conditional, doc_text) → True (anwendbar) / False (→ NA) / None (unbekannt → anwendbar)
|
||||
ApplicableFn = Callable[[str, str], Optional[bool]]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class CriterionEval:
|
||||
"""Eine Kriteriums-Bewertung eines Controls, einer Obligation zugeordnet."""
|
||||
obligation_id: str
|
||||
tier: str # LEGAL_MINIMUM / BEST_PRACTICE / OPTIONAL
|
||||
met: Optional[bool] # True erfüllt · False fehlt · None unbestimmt
|
||||
control_id: str
|
||||
legal_basis: str = ""
|
||||
criterion: str = ""
|
||||
conditional: Optional[str] = None # Applicability-Prädikat der Obligation
|
||||
|
||||
|
||||
@dataclass
|
||||
class ObligationResult:
|
||||
obligation_id: str
|
||||
status: str # MET / PARTIAL / FAILED / NA / UNDETERMINED / OPEN
|
||||
bucket: str # PFLICHT / EMPFEHLUNG / NICHT_ANWENDBAR
|
||||
tier: str # bestimmende Tier der Obligation
|
||||
applicable: bool
|
||||
evidence: list[str] # beitragende control_ids
|
||||
lm_met: int # erfüllte LM-Anforderungen
|
||||
lm_total: int # distinkte LM-Anforderungen (bewertbar)
|
||||
recommendations: list[dict] = field(default_factory=list)
|
||||
|
||||
|
||||
def _governing_tier(evals: list[CriterionEval]) -> str:
|
||||
tiers = {e.tier for e in evals}
|
||||
if LM in tiers:
|
||||
return LM
|
||||
return BP if BP in tiers else OPT
|
||||
|
||||
|
||||
def _requirement_state(evals: list[CriterionEval]) -> Optional[bool]:
|
||||
"""Zustand EINER LM-Anforderung über alle prüfenden Controls (OR/Redundanz):
|
||||
True (irgendwer bestätigt) · None (alle unbestimmt) · False (bewertet, fehlt)."""
|
||||
if any(e.met is True for e in evals):
|
||||
return True
|
||||
if all(e.met is None for e in evals):
|
||||
return None
|
||||
return False
|
||||
|
||||
|
||||
def _recommendations(evals: list[CriterionEval]) -> list[dict]:
|
||||
"""Nicht erfüllte BEST_PRACTICE/OPTIONAL-Kriterien → Empfehlungen."""
|
||||
return [{"criterion": e.criterion, "tier": e.tier, "legal_basis": e.legal_basis,
|
||||
"control_id": e.control_id}
|
||||
for e in evals if e.tier in (BP, OPT) and e.met is False]
|
||||
|
||||
|
||||
def aggregate_obligation(obligation_id: str, evals: list[CriterionEval], *,
|
||||
applicable_fn: Optional[ApplicableFn] = None,
|
||||
doc_text: str = "") -> ObligationResult:
|
||||
evidence = sorted({e.control_id for e in evals if e.control_id})
|
||||
conditional = next((e.conditional for e in evals if e.conditional), None)
|
||||
tier = _governing_tier(evals)
|
||||
recs = _recommendations(evals)
|
||||
|
||||
applicable = True
|
||||
if applicable_fn is not None and conditional:
|
||||
verdict = applicable_fn(conditional, doc_text)
|
||||
applicable = True if verdict is None else bool(verdict)
|
||||
if not applicable:
|
||||
return ObligationResult(obligation_id, NA, NICHT_ANWENDBAR, tier, False,
|
||||
evidence, 0, 0, recs)
|
||||
|
||||
lm_evals = [e for e in evals if e.tier == LM]
|
||||
if lm_evals:
|
||||
reqs: dict[str, list[CriterionEval]] = defaultdict(list)
|
||||
for e in lm_evals:
|
||||
reqs[e.legal_basis or obligation_id].append(e)
|
||||
states = [_requirement_state(v) for v in reqs.values()]
|
||||
determinable = [s for s in states if s is not None]
|
||||
if not determinable:
|
||||
return ObligationResult(obligation_id, UNDETERMINED, PFLICHT, LM, True,
|
||||
evidence, 0, len(states), recs)
|
||||
met = sum(1 for s in determinable if s)
|
||||
total = len(determinable)
|
||||
status = MET if met == total else (FAILED if met == 0 else PARTIAL)
|
||||
return ObligationResult(obligation_id, status, PFLICHT, LM, True,
|
||||
evidence, met, total, recs)
|
||||
|
||||
# Reine BEST_PRACTICE/OPTIONAL-Obligation: nie Pflicht, nie FAILED.
|
||||
covered = any(e.met is True for e in evals)
|
||||
return ObligationResult(obligation_id, MET if covered else OPEN, EMPFEHLUNG,
|
||||
tier, True, evidence, 0, 0, recs)
|
||||
|
||||
|
||||
def aggregate_obligations(evals: list[CriterionEval], *,
|
||||
applicable_fn: Optional[ApplicableFn] = None,
|
||||
doc_text: str = "") -> list[ObligationResult]:
|
||||
"""Flache Kriteriums-Liste → ein ObligationResult je obligation_id."""
|
||||
groups: dict[str, list[CriterionEval]] = defaultdict(list)
|
||||
for e in evals:
|
||||
if e.obligation_id:
|
||||
groups[e.obligation_id].append(e)
|
||||
return [aggregate_obligation(oid, g, applicable_fn=applicable_fn, doc_text=doc_text)
|
||||
for oid, g in groups.items()]
|
||||
|
||||
|
||||
def evals_from_tiered(control_id: str, tiered_criteria: list[dict],
|
||||
detail: list[dict], conditional: Optional[str] = None
|
||||
) -> list[CriterionEval]:
|
||||
"""Adapter: tiered_criteria (obligation_id/tier/legal_basis) + das
|
||||
evaluate_tiered-`detail` (met pro Index, gleiche Reihenfolge) → CriterionEvals.
|
||||
`conditional` kommt aus der Control-`applicability` (gilt für die Obligation)."""
|
||||
out: list[CriterionEval] = []
|
||||
for i, c in enumerate(tiered_criteria or []):
|
||||
oid = c.get("obligation_id")
|
||||
if not oid:
|
||||
continue
|
||||
d = detail[i] if i < len(detail) else {}
|
||||
out.append(CriterionEval(
|
||||
obligation_id=oid,
|
||||
tier=(c.get("compliance_tier") or "").upper(),
|
||||
met=d.get("met"),
|
||||
control_id=control_id,
|
||||
legal_basis=c.get("legal_basis") or "",
|
||||
criterion=c.get("criterion") or "",
|
||||
conditional=conditional,
|
||||
))
|
||||
return out
|
||||
|
||||
|
||||
def summarize(results: list[ObligationResult]) -> dict:
|
||||
"""Phase-C-Kennzahlen: Obligation-Anzahl + Verteilung nach Bucket/Status."""
|
||||
return {
|
||||
"obligations": len(results),
|
||||
"buckets": dict(Counter(r.bucket for r in results)),
|
||||
"statuses": dict(Counter(r.status for r in results)),
|
||||
"pflicht_failed": sum(1 for r in results if r.bucket == PFLICHT and r.status == FAILED),
|
||||
"pflicht_partial": sum(1 for r in results if r.bucket == PFLICHT and r.status == PARTIAL),
|
||||
"recommendations": sum(len(r.recommendations) for r in results),
|
||||
}
|
||||
@@ -0,0 +1,76 @@
|
||||
"""Applicability-Prädikate (minimal) für die Obligation Aggregation Engine.
|
||||
|
||||
Jedes Prädikat entscheidet aus dem Dokumenttext, ob eine BEDINGTE Obligation
|
||||
anwendbar ist:
|
||||
True → anwendbar (normal bewerten)
|
||||
False → NICHT anwendbar (→ NA statt FEHLT)
|
||||
None → Prädikat unbekannt → Aufrufer behält Default=anwendbar (fail-safe,
|
||||
KEINE stille NA)
|
||||
|
||||
Bewusst KLEIN gehalten: nur die bereits modellierten Bedingungen
|
||||
has_third_country_transfer · uses_legitimate_interest · direct_marketing
|
||||
(+ legitimate_interest_or_public_task, weil objection_general_art21_1 dieselbe
|
||||
Rechtsgrundlage als Anknüpfung nutzt). profiling/employment/telecom/health/
|
||||
data_act folgen in der nächsten Charge — bis dahin → None → anwendbar.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Optional
|
||||
|
||||
_THIRD_COUNTRY = (
|
||||
"drittland", "drittstaat", "drittländ", "third countr", "außerhalb der eu",
|
||||
"ausserhalb der eu", "außerhalb des ewr", "ausserhalb des ewr",
|
||||
"angemessenheitsbeschluss", "standardvertragsklausel", "standarddatenschutzklausel",
|
||||
"binding corporate rules", "verbindliche interne datenschutzvorschriften",
|
||||
"data privacy framework", "privacy shield", "in die usa", "in den usa",
|
||||
"vereinigte staaten", "international transfer", "internationale übermittlung",
|
||||
"art. 44", "art. 46",
|
||||
)
|
||||
_LEGIT = (
|
||||
"berechtigtes interesse", "berechtigten interesse", "berechtigte interesse",
|
||||
"legitimate interest", "art. 6 abs. 1 lit. f", "art. 6 abs. 1 f",
|
||||
"art. 6 (1) (f)", "abs. 1 buchstabe f", "interessenabwägung",
|
||||
)
|
||||
_PUBLIC_TASK = (
|
||||
"öffentliche aufgabe", "öffentlichen aufgabe", "im öffentlichen interesse",
|
||||
"art. 6 abs. 1 lit. e", "ausübung öffentlicher gewalt", "official authority",
|
||||
)
|
||||
_DIRECT_MKT = (
|
||||
"direktwerbung", "direktmarketing", "direkt-werbung", "werbe-e-mail", "werbe-mail",
|
||||
"newsletter", "werbliche", "marketingzweck", "marketing-zweck", "zwecke der werbung",
|
||||
"zu werbezwecken", "e-mail-marketing", "postwerbung", "telefonwerbung",
|
||||
)
|
||||
|
||||
|
||||
def _has(text: str, kws: tuple[str, ...]) -> bool:
|
||||
return any(k in text for k in kws)
|
||||
|
||||
|
||||
def has_third_country_transfer(text: str) -> bool:
|
||||
return _has(text, _THIRD_COUNTRY)
|
||||
|
||||
|
||||
def uses_legitimate_interest(text: str) -> bool:
|
||||
return _has(text, _LEGIT)
|
||||
|
||||
|
||||
def direct_marketing(text: str) -> bool:
|
||||
return _has(text, _DIRECT_MKT)
|
||||
|
||||
|
||||
_PREDICATES = {
|
||||
"has_third_country_transfer": has_third_country_transfer,
|
||||
"uses_legitimate_interest": uses_legitimate_interest,
|
||||
"legitimate_interest_or_public_task":
|
||||
lambda t: _has(t, _LEGIT) or _has(t, _PUBLIC_TASK),
|
||||
"direct_marketing": direct_marketing,
|
||||
}
|
||||
|
||||
|
||||
def applicable(conditional: str, doc_text: str) -> Optional[bool]:
|
||||
"""applicable_fn-Hook für `aggregate_obligations`. Unbekanntes Prädikat → None
|
||||
(Aufrufer behält Default=anwendbar; NIE stille NA)."""
|
||||
fn = _PREDICATES.get(conditional)
|
||||
if fn is None:
|
||||
return None
|
||||
return fn((doc_text or "").lower())
|
||||
@@ -0,0 +1,26 @@
|
||||
"""Obligation-Taxonomie-Registry — versioniertes Artefakt bis zur DB-Owner-Tabelle
|
||||
(Legal Obligation Layer v1, docs-src/development/legal_obligation_layer_v1.md).
|
||||
|
||||
Hält Metadaten auf OBLIGATION-Ebene, die (noch) keine eigene DB-Tabelle haben.
|
||||
|
||||
`decision_method_required`: Obligations, deren Erkennung Keyword/Embedding
|
||||
NACHWEISLICH nicht zuverlässig leistet (kompakte/synonymreiche Offenlegung) und
|
||||
die CONTENT/LLM brauchen. Empirisch belegt am TeamViewer-Recall-Defekt: 0/22
|
||||
recipients+international_transfer Controls trafen, obwohl die Pflicht erfüllt war
|
||||
(„…außerhalb EU/EWR … Standardvertragsklauseln/Schutzmaßnahmen"); Embedding cos
|
||||
0.49–0.57 < 0.62, teils falscher Chunk → kein Schwellen-Fix, sondern LLM-Klasse.
|
||||
|
||||
Wirkung: der Shadow zählt ein FAILED solcher Obligations NICHT als „echte Lücke",
|
||||
sondern als RECALL_LIMITED (Prüfer kann sie mit aktueller Methode nicht verifizieren).
|
||||
"""
|
||||
OBLIGATION_META: dict[str, dict] = {
|
||||
"recipients_disclosed": {"decision_method_required": "LLM"},
|
||||
"third_country_transfer_disclosed": {"decision_method_required": "LLM"},
|
||||
"safeguards_disclosed": {"decision_method_required": "LLM"},
|
||||
"safeguards_accessible": {"decision_method_required": "LLM"},
|
||||
}
|
||||
|
||||
|
||||
def requires_llm(obligation_id: str) -> bool:
|
||||
"""True, wenn diese Obligation CONTENT/LLM braucht (Keyword/Embedding-Recall belegt unzureichend)."""
|
||||
return OBLIGATION_META.get(obligation_id, {}).get("decision_method_required") == "LLM"
|
||||
@@ -0,0 +1,130 @@
|
||||
"""DSE Shadow-Verdrahtung der Obligation Aggregation Engine.
|
||||
|
||||
Erzeugt aus den v3-`results` zusätzlich Obligation-Ergebnisse — AUSSCHLIESSLICH
|
||||
für die Telemetrie (Shadow Mode). Ändert KEINE nutzer-sichtbaren Findings.
|
||||
|
||||
Mapping control-level über generation_metadata.legal_obligations +
|
||||
applicability.conditional; das `met`-Signal ist das Legacy-`passed` des Controls
|
||||
(kein zusätzlicher Prüfer-Call, kein Key). Liefert die Vergleichszahlen, mit denen
|
||||
sich der Umschalt-Entscheid später absichern lässt:
|
||||
legacy_control_findings · obligation_shadow_results · collapse_factor ·
|
||||
na_count · met_failed_delta · top_collapsed_obligations
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def fetch_obligation_markers(cids: list[str], db_url: str = "") -> dict[str, dict]:
|
||||
"""legal_obligations + applicability.conditional der Controls laden.
|
||||
Leeres Dict bei Fehler/keiner DB (Shadow fällt still aus)."""
|
||||
cids = [c for c in cids if c]
|
||||
if not cids:
|
||||
return {}
|
||||
import json
|
||||
dsn = db_url or os.getenv("DATABASE_URL") or os.getenv("COMPLIANCE_DATABASE_URL")
|
||||
if not dsn:
|
||||
return {}
|
||||
try:
|
||||
import asyncpg
|
||||
conn = await asyncpg.connect(dsn)
|
||||
rows = await conn.fetch(
|
||||
"select control_id, generation_metadata->'legal_obligations' obl, "
|
||||
"generation_metadata->'applicability'->>'conditional' cond "
|
||||
"from compliance.canonical_controls "
|
||||
"where control_id = any($1::text[]) "
|
||||
"and generation_metadata ? 'legal_obligations'", cids)
|
||||
await conn.close()
|
||||
except Exception as e:
|
||||
logger.warning("fetch_obligation_markers failed: %s", e)
|
||||
return {}
|
||||
out: dict[str, dict] = {}
|
||||
for r in rows:
|
||||
obl = r["obl"]
|
||||
obl = json.loads(obl) if isinstance(obl, str) else obl
|
||||
if obl:
|
||||
out[r["control_id"]] = {"obl": obl, "cond": r["cond"]}
|
||||
return out
|
||||
|
||||
|
||||
def compute_obligation_shadow(results: list[dict], text: str,
|
||||
markers: dict[str, dict]) -> dict[str, Any]:
|
||||
"""Reiner Shadow-Vergleich (keine DB, keine Seiteneffekte). `markers`:
|
||||
{control_id: {obl:[...], cond:str|None}}. `met` = Legacy-`passed`."""
|
||||
from compliance.services.obligation_aggregation import (
|
||||
FAILED, LM, MET, NA, PARTIAL, CriterionEval, aggregate_obligations,
|
||||
)
|
||||
from compliance.services.obligation_applicability import applicable
|
||||
from compliance.services.obligation_taxonomy import requires_llm
|
||||
|
||||
legacy = 0
|
||||
evals: list[Any] = []
|
||||
contrib: dict[str, list] = {}
|
||||
for r in results:
|
||||
cid = r.get("control_id")
|
||||
m = markers.get(cid)
|
||||
if not m:
|
||||
continue
|
||||
passed = bool(r.get("passed"))
|
||||
if not passed:
|
||||
legacy += 1
|
||||
for ob in m["obl"]:
|
||||
evals.append(CriterionEval(ob, LM, passed, cid, "", "", m.get("cond")))
|
||||
contrib.setdefault(ob, []).append((cid, passed))
|
||||
if not evals:
|
||||
return {"status": "no obligation markers on result controls"}
|
||||
|
||||
obls = aggregate_obligations(evals, applicable_fn=applicable, doc_text=text)
|
||||
# FAILED/PARTIAL ehrlich trennen: echte Lücke (failed_by_current_checker) vs
|
||||
# RECALL_LIMITED (Obligation braucht LLM, aktueller Prüfer kann sie nicht verifizieren).
|
||||
findings = failed_current = recall_limited = na = 0
|
||||
for o in obls:
|
||||
if o.status == NA:
|
||||
na += 1
|
||||
elif o.status in (FAILED, PARTIAL):
|
||||
findings += 1
|
||||
if requires_llm(o.obligation_id):
|
||||
recall_limited += 1
|
||||
else:
|
||||
failed_current += 1
|
||||
top = []
|
||||
for o in obls:
|
||||
cs = contrib.get(o.obligation_id, [])
|
||||
fehlt = sum(1 for _, p in cs if not p)
|
||||
if fehlt >= 2:
|
||||
top.append({"obligation": o.obligation_id, "fehlt": fehlt,
|
||||
"total": len(cs), "status": o.status,
|
||||
"recall_limited": bool(requires_llm(o.obligation_id)
|
||||
and o.status in (FAILED, PARTIAL))})
|
||||
top.sort(key=lambda x: -x["fehlt"])
|
||||
met_count = sum(1 for o in obls if o.status == MET)
|
||||
recall_limited_obls = sorted({o.obligation_id for o in obls
|
||||
if o.status in (FAILED, PARTIAL)
|
||||
and requires_llm(o.obligation_id)})
|
||||
return {
|
||||
"legacy_control_findings": legacy,
|
||||
"obligation_shadow_results": len(obls),
|
||||
"obligation_findings": findings,
|
||||
"failed_by_current_checker": failed_current,
|
||||
"recall_limited": recall_limited,
|
||||
"met_count": met_count,
|
||||
"collapse_factor": round(legacy / findings, 2) if findings else None,
|
||||
"na_count": na,
|
||||
"met_failed_delta": legacy - findings,
|
||||
"top_collapsed_obligations": top[:10],
|
||||
"recall_limited_obligations": recall_limited_obls,
|
||||
}
|
||||
|
||||
|
||||
async def build_obligation_shadow(results: list[dict], text: str,
|
||||
db_url: str = "") -> dict[str, Any]:
|
||||
"""Async-Wrapper: Marker laden, dann Shadow rechnen. NIE in `results` schreiben."""
|
||||
cids = [r.get("control_id") for r in results if r.get("control_id")]
|
||||
markers = await fetch_obligation_markers(cids, db_url)
|
||||
if not markers:
|
||||
return {"status": "no markers"}
|
||||
return compute_obligation_shadow(results, text, markers)
|
||||
@@ -158,6 +158,17 @@ async def run_v3_pipeline(
|
||||
except Exception as e:
|
||||
logger.warning("dse tiered eval skipped: %s", e)
|
||||
|
||||
# Layer 4 (SHADOW): Obligation-Aggregation NUR in die Telemetrie. Greift NICHT
|
||||
# in `results` ein — nutzer-sichtbare Findings bleiben unverändert. Liefert die
|
||||
# Vergleichszahlen für den späteren Umschalt-Entscheid (collapse_factor etc.).
|
||||
obligation_shadow: dict[str, Any] = {}
|
||||
try:
|
||||
from ._obligation_shadow import build_obligation_shadow
|
||||
obligation_shadow = await build_obligation_shadow(results, text, db_url)
|
||||
except Exception as e:
|
||||
logger.warning("dse obligation shadow skipped: %s", e)
|
||||
obligation_shadow = {"error": str(e)}
|
||||
|
||||
telemetry = {
|
||||
"layer_0_field_hits": len(boost_field_ids),
|
||||
"layer_0_field_ids": boost_field_ids,
|
||||
@@ -169,6 +180,7 @@ async def run_v3_pipeline(
|
||||
"offtopic_dropped": drop_stats.get("offtopic_dropped", 0),
|
||||
"gate_excluded": len(organizational),
|
||||
"organizational_checklist": organizational,
|
||||
"obligation_shadow": obligation_shadow,
|
||||
}
|
||||
logger.info("dse v3 telemetry: %s", telemetry)
|
||||
return results, telemetry
|
||||
|
||||
@@ -0,0 +1,153 @@
|
||||
"""Unit-Tests Obligation Aggregation Engine (Legal Obligation Layer v1).
|
||||
|
||||
Deckt die fail-safe Regeln + den Redundanz-Kollaps ab (echte DSE-Szenarien:
|
||||
recipients 9×, objection LM+BP, portability OPTIONAL-Format)."""
|
||||
from compliance.services.obligation_aggregation import (
|
||||
BP, LM, OPT, CriterionEval, aggregate_obligation, aggregate_obligations,
|
||||
evals_from_tiered, summarize,
|
||||
)
|
||||
|
||||
|
||||
def _ce(oid, tier, met, cid, basis="", crit="", cond=None):
|
||||
return CriterionEval(oid, tier, met, cid, basis, crit, cond)
|
||||
|
||||
|
||||
class TestRedundancyCollapse:
|
||||
def test_nine_controls_one_confirms_collapses_to_one_met(self):
|
||||
# recipients_disclosed: 9 Controls, gleiche Anforderung (Art 13(1)(e))
|
||||
evals = [_ce("recipients_disclosed", LM, i == 4, f"DATA-{i}", "Art. 13(1)(e)")
|
||||
for i in range(9)]
|
||||
res = aggregate_obligation("recipients_disclosed", evals)
|
||||
assert res.status == "MET"
|
||||
assert res.lm_met == 1 and res.lm_total == 1 # 9 → 1 Anforderung
|
||||
assert len(res.evidence) == 9
|
||||
|
||||
def test_all_nine_absent_fails_once(self):
|
||||
evals = [_ce("recipients_disclosed", LM, False, f"DATA-{i}", "Art. 13(1)(e)")
|
||||
for i in range(9)]
|
||||
res = aggregate_obligation("recipients_disclosed", evals)
|
||||
assert res.status == "FAILED"
|
||||
assert res.bucket == "PFLICHT"
|
||||
|
||||
|
||||
class TestPartialMultiFacet:
|
||||
def test_two_distinct_lm_requirements_one_met_is_partial(self):
|
||||
evals = [
|
||||
_ce("transfer", LM, True, "C1", "Art. 13(1)(f)"), # erfüllt
|
||||
_ce("transfer", LM, False, "C2", "Art. 46"), # fehlt → distinkt
|
||||
]
|
||||
res = aggregate_obligation("transfer", evals)
|
||||
assert res.status == "PARTIAL"
|
||||
assert res.lm_met == 1 and res.lm_total == 2
|
||||
|
||||
def test_both_distinct_requirements_met(self):
|
||||
evals = [
|
||||
_ce("transfer", LM, True, "C1", "Art. 13(1)(f)"),
|
||||
_ce("transfer", LM, True, "C2", "Art. 46"),
|
||||
]
|
||||
assert aggregate_obligation("transfer", evals).status == "MET"
|
||||
|
||||
|
||||
class TestApplicability:
|
||||
def test_conditional_false_is_na(self):
|
||||
evals = [_ce("transfer", LM, False, "C1", "Art. 44", cond="has_third_country_transfer")]
|
||||
res = aggregate_obligation("transfer", evals, applicable_fn=lambda c, t: False)
|
||||
assert res.status == "NA"
|
||||
assert res.bucket == "NICHT_ANWENDBAR"
|
||||
assert res.applicable is False
|
||||
|
||||
def test_conditional_true_evaluates_normally(self):
|
||||
evals = [_ce("transfer", LM, False, "C1", "Art. 44", cond="has_third_country_transfer")]
|
||||
res = aggregate_obligation("transfer", evals, applicable_fn=lambda c, t: True)
|
||||
assert res.status == "FAILED"
|
||||
|
||||
def test_conditional_unknown_defaults_applicable(self):
|
||||
evals = [_ce("transfer", LM, True, "C1", "Art. 44", cond="x")]
|
||||
res = aggregate_obligation("transfer", evals, applicable_fn=lambda c, t: None)
|
||||
assert res.applicable is True and res.status == "MET"
|
||||
|
||||
def test_no_predicate_means_applicable(self):
|
||||
evals = [_ce("transfer", LM, True, "C1", cond="x")]
|
||||
assert aggregate_obligation("transfer", evals).applicable is True
|
||||
|
||||
|
||||
class TestUndetermined:
|
||||
def test_all_lm_none_is_undetermined(self):
|
||||
evals = [_ce("ob", LM, None, "C1", "b"), _ce("ob", LM, None, "C2", "b")]
|
||||
res = aggregate_obligation("ob", evals)
|
||||
assert res.status == "UNDETERMINED"
|
||||
assert res.bucket == "PFLICHT"
|
||||
|
||||
def test_one_determinable_requirement_decides(self):
|
||||
# eine Anforderung unbestimmt, die andere klar erfüllt → MET über die bewertbare
|
||||
evals = [_ce("ob", LM, None, "C1", "b1"), _ce("ob", LM, True, "C2", "b2")]
|
||||
res = aggregate_obligation("ob", evals)
|
||||
assert res.status == "MET"
|
||||
assert res.lm_total == 1 # nur die bewertbare Anforderung zählt
|
||||
|
||||
|
||||
class TestBestPracticeOnly:
|
||||
def test_pure_bp_covered_is_met_recommendation_bucket(self):
|
||||
evals = [_ce("art20_format", OPT, True, "C1")]
|
||||
res = aggregate_obligation("art20_format", evals)
|
||||
assert res.status == "MET"
|
||||
assert res.bucket == "EMPFEHLUNG"
|
||||
|
||||
def test_pure_bp_not_covered_is_open_never_failed(self):
|
||||
evals = [_ce("art20_format", OPT, False, "C1", crit="JSON/CSV")]
|
||||
res = aggregate_obligation("art20_format", evals)
|
||||
assert res.status == "OPEN"
|
||||
assert res.bucket == "EMPFEHLUNG"
|
||||
assert len(res.recommendations) == 1
|
||||
|
||||
|
||||
class TestRecommendationsWithinLm:
|
||||
def test_unmet_bp_in_lm_obligation_becomes_recommendation(self):
|
||||
# objection_direct_marketing: LM erfüllt + 3 BP teils offen
|
||||
evals = [
|
||||
_ce("obj_dm", LM, True, "SEC-8410", "Art. 21(2)", "Recht"),
|
||||
_ce("obj_dm", BP, False, "SEC-8410", "", "Kontaktweg"),
|
||||
_ce("obj_dm", BP, True, "SEC-8410", "", "kostenlos"),
|
||||
]
|
||||
res = aggregate_obligation("obj_dm", evals)
|
||||
assert res.status == "MET" and res.bucket == "PFLICHT"
|
||||
assert len(res.recommendations) == 1
|
||||
assert res.recommendations[0]["criterion"] == "Kontaktweg"
|
||||
|
||||
|
||||
class TestAdapterAndSummary:
|
||||
def test_evals_from_tiered_zips_and_skips_no_obligation(self):
|
||||
tc = [
|
||||
{"criterion": "Recht", "compliance_tier": "LEGAL_MINIMUM",
|
||||
"legal_basis": "Art. 21(1)", "obligation_id": "obj_gen"},
|
||||
{"criterion": "Weg", "compliance_tier": "BEST_PRACTICE",
|
||||
"legal_basis": "", "obligation_id": "obj_gen"},
|
||||
{"criterion": "ohne", "compliance_tier": "OPTIONAL"}, # kein obligation_id → skip
|
||||
]
|
||||
detail = [{"met": True}, {"met": False}, {"met": True}]
|
||||
evals = evals_from_tiered("AUTH-2051", tc, detail, conditional="x")
|
||||
assert len(evals) == 2
|
||||
assert evals[0].met is True and evals[0].conditional == "x"
|
||||
assert evals[1].tier == BP and evals[1].met is False
|
||||
|
||||
def test_aggregate_obligations_groups_by_id(self):
|
||||
evals = [
|
||||
_ce("a", LM, True, "C1", "b"),
|
||||
_ce("a", LM, True, "C2", "b"),
|
||||
_ce("b", LM, False, "C3", "b"),
|
||||
]
|
||||
results = {r.obligation_id: r for r in aggregate_obligations(evals)}
|
||||
assert set(results) == {"a", "b"}
|
||||
assert results["a"].status == "MET"
|
||||
assert results["b"].status == "FAILED"
|
||||
|
||||
def test_summarize_counts_buckets_and_failures(self):
|
||||
evals = [
|
||||
_ce("a", LM, False, "C1", "b"), # FAILED Pflicht
|
||||
_ce("c", OPT, False, "C3", crit="x"), # OPEN Empfehlung
|
||||
]
|
||||
s = summarize(aggregate_obligations(evals))
|
||||
assert s["obligations"] == 2
|
||||
assert s["pflicht_failed"] == 1
|
||||
assert s["buckets"]["PFLICHT"] == 1
|
||||
assert s["buckets"]["EMPFEHLUNG"] == 1
|
||||
@@ -0,0 +1,57 @@
|
||||
"""Unit-Tests für die minimalen Applicability-Prädikate."""
|
||||
from compliance.services.obligation_applicability import (
|
||||
applicable, direct_marketing, has_third_country_transfer,
|
||||
uses_legitimate_interest,
|
||||
)
|
||||
|
||||
|
||||
class TestThirdCountry:
|
||||
def test_drittland_present(self):
|
||||
assert has_third_country_transfer("übermittlung in ein drittland erfolgt") is True
|
||||
|
||||
def test_scc_present(self):
|
||||
assert has_third_country_transfer("auf basis der standardvertragsklauseln") is True
|
||||
|
||||
def test_absent(self):
|
||||
assert has_third_country_transfer("verarbeitung nur innerhalb deutschlands") is False
|
||||
|
||||
|
||||
class TestLegitimateInterest:
|
||||
def test_present(self):
|
||||
assert uses_legitimate_interest("auf grundlage unseres berechtigten interesses") is True
|
||||
|
||||
def test_absent(self):
|
||||
assert uses_legitimate_interest("nur auf grundlage ihrer einwilligung") is False
|
||||
|
||||
|
||||
class TestDirectMarketing:
|
||||
def test_newsletter(self):
|
||||
assert direct_marketing("anmeldung zum newsletter möglich") is True
|
||||
|
||||
def test_direktwerbung(self):
|
||||
assert direct_marketing("daten für direktwerbung genutzt") is True
|
||||
|
||||
def test_absent(self):
|
||||
assert direct_marketing("wir versenden keine werblichen inhalte ohne basis") is True # 'werbliche' trifft
|
||||
|
||||
def test_truly_absent(self):
|
||||
assert direct_marketing("reine vertragsabwicklung") is False
|
||||
|
||||
|
||||
class TestApplicableHook:
|
||||
def test_known_predicate_true(self):
|
||||
assert applicable("has_third_country_transfer", "Transfer in die USA") is True
|
||||
|
||||
def test_known_predicate_false_triggers_na(self):
|
||||
assert applicable("has_third_country_transfer", "nur in der EU") is False
|
||||
|
||||
def test_public_task_alias(self):
|
||||
assert applicable("legitimate_interest_or_public_task",
|
||||
"zur ausübung öffentlicher gewalt") is True
|
||||
|
||||
def test_unknown_predicate_returns_none(self):
|
||||
# profiling noch nicht modelliert → None → Aufrufer behält anwendbar
|
||||
assert applicable("profiling", "irgendein text") is None
|
||||
|
||||
def test_case_insensitive(self):
|
||||
assert applicable("uses_legitimate_interest", "BERECHTIGTES INTERESSE") is True
|
||||
@@ -0,0 +1,92 @@
|
||||
"""Unit-Tests für die reinen Helfer der Obligation Discovery Pipeline (scripts/obligation_discovery/_core.py)."""
|
||||
import pathlib
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[2] / "scripts" / "obligation_discovery"))
|
||||
|
||||
from _core import ( # noqa: E402
|
||||
centroid, cosine, greedy_cluster, merge_edges, parse_req, validate_registry,
|
||||
)
|
||||
|
||||
|
||||
class TestParseReq:
|
||||
def test_list_passthrough(self):
|
||||
assert parse_req(["a", "b"]) == ["a", "b"]
|
||||
|
||||
def test_python_repr_string(self):
|
||||
assert parse_req("['x', 'y']") == ["x", "y"]
|
||||
|
||||
def test_json_string(self):
|
||||
assert parse_req('["x", "y"]') == ["x", "y"]
|
||||
|
||||
def test_plain_string(self):
|
||||
assert parse_req("just text") == ["just text"]
|
||||
|
||||
|
||||
class TestCosine:
|
||||
def test_identical(self):
|
||||
assert cosine([1.0, 2.0, 3.0], [1.0, 2.0, 3.0]) > 0.999
|
||||
|
||||
def test_orthogonal(self):
|
||||
assert abs(cosine([1.0, 0.0], [0.0, 1.0])) < 1e-9
|
||||
|
||||
def test_empty(self):
|
||||
assert cosine([], [1.0]) == 0.0
|
||||
|
||||
|
||||
class TestGreedyCluster:
|
||||
def test_near_vectors_cluster_far_separate(self):
|
||||
vecs = [[1.0, 0.0], [0.99, 0.01], [0.0, 1.0]]
|
||||
clusters = greedy_cluster(vecs, 0.9)
|
||||
assert len(clusters) == 2
|
||||
assert clusters[0]["members"] == [0, 1]
|
||||
assert clusters[1]["members"] == [2]
|
||||
|
||||
def test_deterministic(self):
|
||||
vecs = [[1.0, 0.0], [0.5, 0.5], [0.99, 0.0]]
|
||||
assert greedy_cluster(vecs, 0.8) == greedy_cluster(vecs, 0.8)
|
||||
|
||||
def test_none_vector_isolated(self):
|
||||
clusters = greedy_cluster([[1.0, 0.0], None], 0.5)
|
||||
assert clusters[1]["members"] == [1] and clusters[1]["seed"] is None
|
||||
|
||||
|
||||
class TestCentroid:
|
||||
def test_mean(self):
|
||||
assert centroid([0, 1], [[0.0, 2.0], [2.0, 4.0]]) == [1.0, 3.0]
|
||||
|
||||
|
||||
class TestValidateRegistry:
|
||||
def _reg(self, obls, rels=None):
|
||||
return {"obligations": obls, "relationships": rels or []}
|
||||
|
||||
def test_lm_without_legal_basis_fails(self):
|
||||
r = self._reg([{"id": "x", "tier": "LEGAL_MINIMUM", "legal_basis": [], "member_controls": ["C1"]}])
|
||||
v = validate_registry(r)
|
||||
assert v["lm_without_legal_basis"] == ["x"] and v["passed"] is False
|
||||
|
||||
def test_clean_passes(self):
|
||||
r = self._reg([{"id": "x", "tier": "LEGAL_MINIMUM", "legal_basis": [{"source": "CRA"}],
|
||||
"member_controls": ["C1"], "provenance": {"source_meta_cluster": "M0"}}])
|
||||
assert validate_registry(r)["passed"] is True
|
||||
|
||||
def test_over8_per_review_unit_flagged(self):
|
||||
obls = [{"id": f"o{i}", "tier": "BEST_PRACTICE", "member_controls": ["C"],
|
||||
"provenance": {"source_meta_cluster": "M0"}} for i in range(9)]
|
||||
v = validate_registry(self._reg(obls))
|
||||
assert v["over8_per_review_unit"] == {"M0": 9} and v["passed"] is False
|
||||
|
||||
def test_empty_member_controls_flagged(self):
|
||||
v = validate_registry(self._reg([{"id": "x", "tier": "BEST_PRACTICE", "member_controls": []}]))
|
||||
assert v["empty_member_controls"] == ["x"] and v["passed"] is False
|
||||
|
||||
|
||||
class TestMergeEdges:
|
||||
def test_dedup_and_semantic_only(self):
|
||||
existing = [{"type": "supports", "from": "a", "to": "b"}]
|
||||
proposed = [{"type": "supports", "from": "a", "to": "b"}, # dup
|
||||
{"type": "depends_on", "from": "c", "to": "d"}, # new
|
||||
{"type": "out_of_scope", "clusters": [1]}] # not semantic
|
||||
merged, added = merge_edges(existing, proposed)
|
||||
assert added == 1
|
||||
assert {"type": "depends_on", "from": "c", "to": "d"} in merged
|
||||
@@ -0,0 +1,74 @@
|
||||
"""Unit-Tests für die DSE Shadow-Verdrahtung (compute_obligation_shadow, pure)."""
|
||||
from compliance.services.specialist_agents.dse._obligation_shadow import (
|
||||
compute_obligation_shadow,
|
||||
)
|
||||
|
||||
NON_LLM = "art20_right_exists_core" # nicht in der LLM_REQUIRED-Registry
|
||||
LLM_REQ = "third_country_transfer_disclosed" # in der LLM_REQUIRED-Registry
|
||||
|
||||
|
||||
def _markers(n, ob, cond=None):
|
||||
return {f"C{i}": {"obl": [ob], "cond": cond} for i in range(n)}
|
||||
|
||||
|
||||
class TestComputeShadow:
|
||||
def test_collapse_and_delta(self):
|
||||
results = [{"control_id": f"C{i}", "passed": False} for i in range(5)]
|
||||
s = compute_obligation_shadow(results, "x", _markers(5, NON_LLM))
|
||||
assert s["legacy_control_findings"] == 5
|
||||
assert s["obligation_findings"] == 1 # 5 → 1
|
||||
assert s["failed_by_current_checker"] == 1
|
||||
assert s["recall_limited"] == 0
|
||||
assert s["collapse_factor"] == 5.0
|
||||
assert s["met_failed_delta"] == 4
|
||||
assert s["met_count"] == 0
|
||||
top = s["top_collapsed_obligations"][0]
|
||||
assert top["obligation"] == NON_LLM and top["fehlt"] == 5
|
||||
assert top["recall_limited"] is False
|
||||
|
||||
def test_fp_correction_one_passed_collapses_to_met(self):
|
||||
results = [{"control_id": f"C{i}", "passed": i == 0} for i in range(5)]
|
||||
s = compute_obligation_shadow(results, "x", _markers(5, NON_LLM))
|
||||
assert s["legacy_control_findings"] == 4
|
||||
assert s["obligation_findings"] == 0 # MET (anderswo erfüllt)
|
||||
assert s["met_failed_delta"] == 4
|
||||
|
||||
def test_na_when_predicate_false(self):
|
||||
results = [{"control_id": "C0", "passed": False}]
|
||||
m = {"C0": {"obl": [LLM_REQ], "cond": "has_third_country_transfer"}}
|
||||
s = compute_obligation_shadow(results, "nur innerhalb der eu", m)
|
||||
assert s["na_count"] == 1
|
||||
assert s["obligation_findings"] == 0 # NA statt FEHLT
|
||||
|
||||
def test_no_markers_returns_status(self):
|
||||
s = compute_obligation_shadow([{"control_id": "C0", "passed": False}], "x", {})
|
||||
assert "no obligation" in s["status"]
|
||||
|
||||
def test_does_not_mutate_results(self):
|
||||
results = [{"control_id": "C0", "passed": False}]
|
||||
compute_obligation_shadow(results, "x", _markers(1, NON_LLM))
|
||||
assert set(results[0].keys()) == {"control_id", "passed"}
|
||||
|
||||
|
||||
class TestRecallSegregation:
|
||||
def test_llm_required_failed_is_recall_limited_not_real_gap(self):
|
||||
# 5 verfehlte third_country-Controls, Transfer-Text vorhanden → FAILED,
|
||||
# aber LLM_REQUIRED → RECALL_LIMITED, NICHT failed_by_current_checker.
|
||||
results = [{"control_id": f"C{i}", "passed": False} for i in range(5)]
|
||||
m = {f"C{i}": {"obl": [LLM_REQ], "cond": "has_third_country_transfer"}
|
||||
for i in range(5)}
|
||||
s = compute_obligation_shadow(results, "übermittlung in ein drittland", m)
|
||||
assert s["obligation_findings"] == 1
|
||||
assert s["recall_limited"] == 1
|
||||
assert s["failed_by_current_checker"] == 0
|
||||
assert s["recall_limited_obligations"] == [LLM_REQ]
|
||||
assert s["top_collapsed_obligations"][0]["recall_limited"] is True
|
||||
|
||||
def test_mixed_real_gap_and_recall_limited(self):
|
||||
results = [{"control_id": "A", "passed": False}, {"control_id": "B", "passed": False}]
|
||||
m = {"A": {"obl": [NON_LLM], "cond": None},
|
||||
"B": {"obl": [LLM_REQ], "cond": "has_third_country_transfer"}}
|
||||
s = compute_obligation_shadow(results, "übermittlung in ein drittland", m)
|
||||
assert s["obligation_findings"] == 2
|
||||
assert s["failed_by_current_checker"] == 1
|
||||
assert s["recall_limited"] == 1
|
||||
@@ -0,0 +1,20 @@
|
||||
"""Unit-Tests für die Obligation-Taxonomie-Registry (decision_method_required)."""
|
||||
from compliance.services.obligation_taxonomy import OBLIGATION_META, requires_llm
|
||||
|
||||
|
||||
class TestRequiresLlm:
|
||||
def test_marked_obligations_require_llm(self):
|
||||
for ob in ("recipients_disclosed", "third_country_transfer_disclosed",
|
||||
"safeguards_disclosed", "safeguards_accessible"):
|
||||
assert requires_llm(ob) is True
|
||||
|
||||
def test_unmarked_obligation_does_not(self):
|
||||
assert requires_llm("art20_right_exists_core") is False
|
||||
assert requires_llm("objection_general_art21_1") is False
|
||||
|
||||
def test_unknown_obligation_is_false(self):
|
||||
assert requires_llm("does_not_exist") is False
|
||||
|
||||
def test_registry_values_are_llm(self):
|
||||
assert all(v.get("decision_method_required") == "LLM"
|
||||
for v in OBLIGATION_META.values())
|
||||
@@ -0,0 +1,89 @@
|
||||
# Obligation Aggregation — Validated Shadow Results (2026-06-24)
|
||||
|
||||
Status: **bewiesen im Shadow auf macmini**, NICHT deployt, NICHT live geschaltet.
|
||||
Code auf Branch `feat/obligation-aggregation`; das LLM-Tiering der recipients/transfer-
|
||||
Controls liegt als DB-Marker nur auf macmini.
|
||||
|
||||
Dieser Stand validiert die Ausführung des [Legal Obligation Layer v1](legal_obligation_layer_v1.md)
|
||||
über vier ineinandergreifende Schichten.
|
||||
|
||||
## Die vier Schichten
|
||||
|
||||
1. **Obligation Aggregation** — `compliance/services/obligation_aggregation.py`.
|
||||
Aggregiert Kriterium-/Control-Bewertungen zu Findings auf OBLIGATION-Ebene
|
||||
(Regulation → Obligation → Control → Criterion). Redundanz kollabiert per OR pro
|
||||
`legal_basis`-Anforderung; fail-safe Status (MET/PARTIAL/FAILED/NA/UNDETERMINED/OPEN).
|
||||
2. **Applicability** — `compliance/services/obligation_applicability.py`.
|
||||
Prädikate (`has_third_country_transfer`, `uses_legitimate_interest`, `direct_marketing`,
|
||||
`legitimate_interest_or_public_task`) entscheiden bedingte Obligations → True/False/None
|
||||
(unbekannt → anwendbar, nie stille NA).
|
||||
3. **Recall-limited Segregation** — `compliance/services/obligation_taxonomy.py` +
|
||||
`specialist_agents/dse/_obligation_shadow.py`.
|
||||
`decision_method_required=LLM` trennt FAILED ehrlich in `failed_by_current_checker`
|
||||
(echte Lücke) vs `recall_limited` (Prüfer kann mit aktueller Methode nicht verifizieren).
|
||||
4. **Targeted LLM Fix** — recipients/transfer-Controls mit `tiered_criteria`
|
||||
(decision_method=LLM) → Layer 3 nutzt den **Haiku-Sufficiency-Judge** statt Keyword/Embedding.
|
||||
|
||||
## Shadow-Zahlen (7 Firmen, Live-Engine, Keyword/Embedding)
|
||||
|
||||
| | Wert |
|
||||
|---|---|
|
||||
| legacy control-findings | 136 |
|
||||
| obligation findings | 29 |
|
||||
| **Kollaps** | **4,7×** |
|
||||
| davon echte Lücken | 23 |
|
||||
| davon recall_limited | 6 (nur 2/7 Firmen, nur Drittland/Garantien) |
|
||||
| MET (FP-Korrektur) | 46 |
|
||||
| N/A (Applicability) | 2 |
|
||||
|
||||
`recall_limited` ist klein + konzentriert: ausschließlich `third_country_transfer_disclosed` /
|
||||
`safeguards_disclosed` / `safeguards_accessible`, je 2/7 Firmen. `recipients_disclosed`
|
||||
manifestierte nie als recall_limited (Keyword/Embedding trägt dort).
|
||||
|
||||
## Targeted LLM Fix — Validierung (teamviewer + safetykon)
|
||||
|
||||
Recall-Defekt-Diagnose (teamviewer): die Drittland-/Garantien-Offenlegung steht dicht in
|
||||
einem Absatz („…außerhalb EU/EWR … Standardvertragsklauseln/Schutzmaßnahmen"), aber
|
||||
**0/22 Controls** trafen — Keyword (Vokabular-Mismatch) und Embedding (cos 0.49–0.57, teils
|
||||
falscher Chunk) versagen. Kein Schwellen-Fix → CONTENT/LLM-Klasse.
|
||||
|
||||
Nach LLM-Tiering (Haiku-Judge):
|
||||
|
||||
| | vorher (kw+emb) | nachher (LLM) |
|
||||
|---|---|---|
|
||||
| teamviewer findings | 5 | **0** |
|
||||
| teamviewer recall_limited | 3 | **0** |
|
||||
| safetykon findings | 7 | **4** |
|
||||
| safetykon recall_limited | 3 | **0** |
|
||||
|
||||
- **teamviewer → 0 Findings:** DSE auf diesen Pflichten real konform; die 5 alten Findings
|
||||
waren Falsch-Positive des Keyword/Embedding-Prüfers.
|
||||
- **safetykon → 4 (keine Über-Korrektur):** Drittland/Garantien → MET, aber
|
||||
`art20_right_exists_core` + `art20_machine_readable_format` bleiben **FAILED** (echte
|
||||
Portability-Lücke), `legitimate_interest_disclosed` → **NA** (Applicability).
|
||||
|
||||
## Eingesetztes Modell
|
||||
|
||||
Der Tiered-/Sufficiency-Pfad ist **fest auf Claude Haiku 4.5 verdrahtet**
|
||||
(`checkers/router.py:build_spec` setzt für CONTENT/LLM `extra.judge="haiku"` →
|
||||
`llm_checker._haiku` → `_call_anthropic`; validierter Judge P0.89/R0.91, Entscheidung
|
||||
2026-06-22). **Nicht** die OVH-Kaskade (35b/120b), **nicht** Opus. Konsequenz: der Fix
|
||||
reproduziert sich überall identisch, braucht aber einen gültigen Anthropic-Key für den
|
||||
Haiku-Judge — auch auf dev.
|
||||
|
||||
## Nächster operativer Block (gegated, NICHT ausgeführt)
|
||||
|
||||
```
|
||||
Deploy-Fenster frei (andere Session fertig)
|
||||
↓
|
||||
dev-DB-Tiering replizieren (die 22 recipients/transfer-Controls)
|
||||
↓
|
||||
Haiku-Judge auf dev bestätigen (gültiger Anthropic-Key — NICHT der OVH-Pfad)
|
||||
↓
|
||||
Shadow aktiv lassen (Telemetrie), Produktverhalten unverändert
|
||||
↓
|
||||
erst dann Umschalten planen
|
||||
```
|
||||
|
||||
Folge-Cleanup: sobald LLM-Tiering Standard ist, wird die `recall_limited`-Segregation für
|
||||
diese 4 Obligations obsolet (dann ist FAILED = echte Lücke, nicht Reichweitenproblem).
|
||||
@@ -0,0 +1,77 @@
|
||||
# Obligation Discovery Pipeline v1
|
||||
|
||||
Ein **generisches Verfahren zur Ableitung einer regulatorischen Ontologie** (Legal Obligation
|
||||
Registry) aus großen Compliance-Korpora. Validiert über drei Domänen (SBOM, Vulnerability
|
||||
Handling, Authentication). Erzeugt die zitierfähige Mitte aus
|
||||
[obligation_registry_v1.md](obligation_registry_v1.md).
|
||||
|
||||
## Architekturregel (nicht verhandelbar)
|
||||
|
||||
```
|
||||
RUNTIME bleibt deterministisch (Document → Embedding → LLM-Judge → Finding)
|
||||
DISCOVERY darf LLM-gestützt sein (Controls → … → LLM-Synthese → Obligation Registry)
|
||||
```
|
||||
Discovery läuft **einmalig/offline** mit dem stärksten Modell; die Runtime-Prüf-Engine wird
|
||||
davon nicht berührt. Zwei getrennte Probleme, eine gemeinsame Sprache (die Obligation).
|
||||
|
||||
## Stufen (`scripts/obligation_discovery/`)
|
||||
|
||||
| Stufe | Skript | Aufgabe | Key |
|
||||
|---|---|---|---|
|
||||
| 1 | `precluster.py` | Controls (scope) → Embedding (gecacht) → **Mikro-Cluster** | – |
|
||||
| 2 | `meta_cluster.py` | Mikro → **Review Units** (Skalierungs-Fix für große Domänen) | – |
|
||||
| 3 | `synthesize_obligations.py` | Review Units → Opus → **Obligation Candidates** | ENV |
|
||||
| 4 | `validate_registry.py` | Belastbarkeits-Checks | – |
|
||||
| 5 | `merge_review_diff.py` | vorgeschlagene Beziehungskanten dedupliziert mergen | – |
|
||||
|
||||
Reine, unit-getestete Helfer in `_core.py`. Ausführung im `bp-compliance-backend`-Container
|
||||
(`PYTHONPATH=/app`); der Key kommt aus `ANTHROPIC_API_KEY` (nie hartcodiert).
|
||||
|
||||
## Zwei-Stufen-Clustering = der Skalierungs-Fix
|
||||
|
||||
Ein flacher Single-Threshold-Pre-Cluster + EIN LLM-Synthese-Call skaliert NICHT auf große
|
||||
Domänen. Lösung: eine Hierarchiestufe. **Review Unit ≠ Meta-Cluster** — die Review Unit ist
|
||||
das, was der LLM sieht (entkoppelt vom Clustering, später merge/split-bar).
|
||||
|
||||
## Belegte Meilensteine
|
||||
|
||||
| Domäne | Controls | → Cluster/Review Units | → Obligations | vs Ground Truth |
|
||||
|---|---|---|---|---|
|
||||
| **SBOM** | 258 | 86 Mikro | 12 (→ 11 final) | manuell ~10 — **reproduziert + verfeinert** |
|
||||
| **Vulnerability** | 531 | 200 Mikro | 8 | manuell ~7 — **reproduziert** |
|
||||
| **Authentication** | 4408 | 2134 Mikro → **170 Review Units** | 54 → Kuration **29** | Skalierung — **generalisiert** |
|
||||
|
||||
## Harte Tier-Regel generalisiert
|
||||
|
||||
`LEGAL_MINIMUM` nur mit Primärrechts-Anker (`legal_basis`), sonst `BEST_PRACTICE` /
|
||||
`IMPLEMENTATION_GUIDANCE` / `EVIDENCE`. Authentication zeigt den Wert: nur **6** harte
|
||||
Pflichten (CRA fordert „angemessene Authentisierung"), MFA/Passwort/Session/Krypto sind
|
||||
`guidance_basis`. So kann der Advisor sagen: *„Gesetzlich gefordert ist Schutz vor unbefugtem
|
||||
Zugriff; MFA ist anerkannte Umsetzung, aber keine CRA-Wortlautpflicht."*
|
||||
|
||||
## Kuration (große Domänen)
|
||||
|
||||
Die Synthese darf über-splitten; ein **key-freier, regelbasierter Kurations-Pass** verdichtet:
|
||||
Krypto-Mikro-Mechanismen → `guidance_basis`; Prüf-/Nachweis-Themen → `evidence`-Facette;
|
||||
Mechanismus-Familien bleiben; domänenfremdes (eID/PSD2) → `out_of_scope`; LEGAL_MINIMUM
|
||||
unangetastet.
|
||||
|
||||
## Lessons
|
||||
|
||||
- Große Opus-Calls brauchen **Streaming** (`messages.stream`); der SDK blockt non-streaming
|
||||
bei `max_tokens` > ~8k mit „Streaming is required for operations that may take longer than 10 minutes".
|
||||
- Provenance pro Obligation (`source_meta_cluster`, `discovery_confidence`, `llm_model`,
|
||||
`synthesis_version`) — für spätere Evolution (CRA-Update, Modellwechsel).
|
||||
- `>8 Obligations / Review Unit` → automatische Review-Warnung (Over-Split-Indikator).
|
||||
- Embedding-Cache (pickle) → THR2-Sweeps ohne Re-Embed.
|
||||
|
||||
## End-to-End-Beispiel
|
||||
|
||||
```bash
|
||||
# im bp-compliance-backend-Container, PYTHONPATH=/app, cwd = scripts/obligation_discovery
|
||||
python3 precluster.py --scope auth
|
||||
python3 meta_cluster.py --scope auth --meta-thr 0.62 # → /tmp/auth_review_units.json (inspizieren!)
|
||||
ANTHROPIC_API_KEY=… python3 synthesize_obligations.py \
|
||||
--units /tmp/auth_review_units.json --regulation CRA --theme "Authentisierung" --out /tmp/auth_registry.json
|
||||
python3 validate_registry.py /tmp/auth_registry.json
|
||||
```
|
||||
@@ -0,0 +1,130 @@
|
||||
# Obligation Registry v1 — Schema, Zitierfähigkeit, Zwei-Graphen-Architektur
|
||||
|
||||
Status: **Spec festgeschrieben (2026-06-24)**. Baut auf
|
||||
[legal_obligation_layer_v1.md](legal_obligation_layer_v1.md) +
|
||||
[obligation_aggregation_validation.md](obligation_aggregation_validation.md).
|
||||
Die Obligation Discovery Pipeline v1 ist gegen Ground Truth validiert
|
||||
(SBOM 12 vs 10, Vuln 8 vs 7, out_of_scope + conditional Applicability korrekt).
|
||||
|
||||
## Leitsatz
|
||||
|
||||
**Die Legal Obligation ist das fachliche Wissensobjekt der Plattform** — nicht der Master
|
||||
Control. Controls sind Prüfstrategien / Erkennungsmuster / Evidenzsammler FÜR eine Obligation.
|
||||
Ohne Zitierfähigkeit ist die Registry fachlich nicht belastbar: die erste Kundenfrage ist
|
||||
immer „**Wo steht das?**".
|
||||
|
||||
## Zwei Assets, zwei Graphen, EIN Join (nicht verschmelzen, verbinden)
|
||||
|
||||
- **Asset 1 — Compliance Knowledge** (bereits gebaut): 313k atomare Controls, 33k Master
|
||||
Controls, ~14k use-case-gemappt, Dedup, Obligation Layer, Applicability, Tiering, G/C/E.
|
||||
- **Asset 2 — Zitierfähige Wissensbasis** (entsteht in anderer Session): Dokument → Chunk →
|
||||
Paragraph → Span → Zitat.
|
||||
|
||||
Die beiden werden **NICHT verschmolzen** (das wäre wie eine normalisierte DB nach CSV zu
|
||||
exportieren und neu zu importieren). Sie werden über die **Obligation gekoppelt**:
|
||||
|
||||
```
|
||||
GRAPH 1 — Legal Knowledge Graph (Chat/Advisor) GRAPH 2 — Compliance Execution Graph (Engine)
|
||||
Regulation → Annex/Artikel → Paragraph → Span Obligation → Control → Criterion → Evidence → Finding
|
||||
\ /
|
||||
\____ LEGAL OBLIGATION ______/ ← gemeinsame Sprache (der Join)
|
||||
```
|
||||
Chat: „diese Aussage stammt aus Absatz X." · Engine: „diese Obligation ist nicht erfüllt." →
|
||||
beide meinen DIESELBE `obligation_id`.
|
||||
|
||||
## Registry-Schema v1
|
||||
|
||||
```yaml
|
||||
id: # snake_case, regulierungs-agnostisch (z.B. sbom_complete)
|
||||
name: # kurz
|
||||
description: # 1 Satz
|
||||
tier: # LEGAL_MINIMUM | BEST_PRACTICE | IMPLEMENTATION_GUIDANCE | EVIDENCE
|
||||
family: # Organisationshilfe (z.B. sbom, vulnerability_handling)
|
||||
applicability: # universal | conditional:<pred> | domain:<x>
|
||||
facets: # welche Evidenz-Facetten die Pflicht belegt
|
||||
governance: bool
|
||||
capability: bool
|
||||
evidence: bool
|
||||
legal_basis: # PRIMÄRRECHT — Pflicht zwingend (mind. 1 Anker für LEGAL_MINIMUM)
|
||||
- source: CRA
|
||||
regulation_code: eu_2024_2847
|
||||
article: "" # falls zutreffend
|
||||
annex: "Annex I, Part II"
|
||||
section: ""
|
||||
paragraph: ""
|
||||
span_id: "" # harter Anker in die zitierfähige Wissensbasis (Asset 2)
|
||||
document_version: ""
|
||||
citation: "" # menschenlesbar
|
||||
guidance_basis: # SEKUNDÄR — Umsetzung/Best Practice, NICHT Pflicht
|
||||
- source: NIST SSDF
|
||||
anchor: ""
|
||||
role: best_practice # implementation_guidance | best_practice
|
||||
member_controls: # control_uuids (Prüflogik aus Asset 1)
|
||||
citation_anchor_ids: # span/paragraph-Anker (Asset 2) — auf der OBLIGATION, NICHT auf Controls
|
||||
relationships: # siehe Beziehungsgraph
|
||||
decision_method: # CONTENT/LLM | CONTENT/EMBEDDING | FIELD/REGEX | BEHAVIOR/PLAYWRIGHT ...
|
||||
out_of_scope: [] # ausgeschlossene Cluster + Begründung
|
||||
```
|
||||
|
||||
## Zitierfähigkeit hängt an der OBLIGATION (nicht an Controls)
|
||||
|
||||
258 SBOM-Controls → 11 Obligations: nur die **Obligation** speichert
|
||||
`CRA / Annex I / Paragraph X / chunk_id / span_id / document_version`. Die 258 Controls zeigen
|
||||
nur auf die `obligation_id`. Folge: **Regulierungsänderung (CRA v1→v2) = `citation_anchor`
|
||||
tauschen, Controls bleiben identisch.** Massive Pflegeersparnis + Versionsstabilität.
|
||||
|
||||
## `legal_basis` vs `guidance_basis` + `source_role`
|
||||
|
||||
Damit beim Verschmelzen von CRA + NIST + OWASP zu einer Obligation NICHT verloren geht, was
|
||||
Pflicht / Best Practice / Evidenz / Umsetzung ist, klassifiziert die Discovery-Pipeline jeden
|
||||
Member/Cluster mit einer **`source_role`**:
|
||||
|
||||
```
|
||||
LEGAL_BASIS → Primärrecht (begründet die Pflicht)
|
||||
GUIDANCE → NIST/OWASP/ENISA/BSI/ISO (Umsetzung/Best Practice)
|
||||
EVIDENCE → Nachweis/Bericht/Audit
|
||||
IMPLEMENTATION → technische Umsetzungsanweisung
|
||||
OUT_OF_SCOPE → gehört nicht zur Obligation (andere Regulierung/Domäne)
|
||||
```
|
||||
|
||||
## HARTE Tier-Regel
|
||||
|
||||
Eine Obligation wird **`LEGAL_MINIMUM` nur mit mindestens einem Primärrechts-Anker**
|
||||
(`legal_basis` nicht leer). Ohne Primärrechts-Anker:
|
||||
`BEST_PRACTICE | IMPLEMENTATION_GUIDANCE | EVIDENCE` — **aber niemals Pflicht.**
|
||||
|
||||
## Beziehungsgraph (Ontologie)
|
||||
|
||||
**Strukturell** (bereits in der Pipeline): `same_obligation`, `sub_obligation`,
|
||||
`applicability_variant`, `evidence_for`, `governance_for`, `out_of_scope`.
|
||||
|
||||
**Semantisch (NEU, P2-Ergänzung):** `requires`, `implements`, `supports`,
|
||||
`produces_evidence_for`, `depends_on`, `derived_from`. Beispiele:
|
||||
```
|
||||
sbom_established --supports--> vulnerability_handling --supports--> incident_reporting
|
||||
authentication --requires--> credential_management
|
||||
```
|
||||
→ für den Compliance Advisor extrem wertvoll (er kann Pflicht-Ketten erklären).
|
||||
|
||||
## Citation-Anchor-Pipeline (Document → Obligation, NICHT Document → Control)
|
||||
|
||||
Der neue Ingest erzeugt zusätzlich zu Chunk/Embedding: `paragraph_uuid`, `span_uuid`,
|
||||
`document_version`, `legal_citation`, `referenced_articles`, `referenced_regulations`.
|
||||
**Erst danach** läuft Obligation Discovery, sodass jede neu entdeckte Obligation sofort ihre
|
||||
Primärquelle bekommt:
|
||||
```
|
||||
Neue Dokumente → Chunking → Span IDs → LLM („welche Obligation(en)?") → Confidence
|
||||
→ Review → obligation.citation_anchor_ids[]
|
||||
```
|
||||
Die alten Controls werden wiederverwendet; die Pipeline erzeugt zusätzlich Obligation→Evidence
|
||||
und Obligation→Citation-Anchors. **Kein Re-Ingest zum Neubau von Controls.**
|
||||
|
||||
## Sequenz (geändert — Registry vor weiteren Cuts)
|
||||
|
||||
```
|
||||
SBOM ✓ → Vuln ✓ → Registry v1 (DIESE Spec) → Ontologie/Beziehungsgraph ergänzen
|
||||
→ Authentication → Remote Access → Logging → Updates
|
||||
```
|
||||
Begründung: Schema jetzt billig änderbar; bei 300–1000 Obligations wird jede Schemaänderung
|
||||
teuer. Fortschritt wird daran gemessen, ob jede neue Obligation die Registry besser macht —
|
||||
nicht an neuen Controls.
|
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,114 @@
|
||||
"""Reine Helfer der Obligation Discovery Pipeline (keine schweren Imports → unit-testbar).
|
||||
|
||||
Die Pipeline leitet aus großen Compliance-Korpora eine regulatorische Ontologie ab:
|
||||
Controls → Mikro-Cluster → Meta-Cluster/Review-Units → LLM-Synthese → Obligation Registry.
|
||||
Architekturregel: RUNTIME bleibt deterministisch; DISCOVERY (dieses Tooling) darf LLM-gestützt
|
||||
sein und läuft EINMALIG/offline. Siehe docs-src/development/obligation_discovery_pipeline_v1.md.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import ast
|
||||
import json
|
||||
import math
|
||||
from typing import Optional
|
||||
|
||||
SEMANTIC_EDGE_TYPES = ("depends_on", "supports", "produces_evidence_for",
|
||||
"implements", "derived_from")
|
||||
|
||||
|
||||
def parse_req(req) -> list:
|
||||
"""requirements-Spalte (JSON ODER Python-Repr ODER String) robust zu Liste."""
|
||||
if isinstance(req, list):
|
||||
return req
|
||||
if isinstance(req, str):
|
||||
for fn in (json.loads, ast.literal_eval):
|
||||
try:
|
||||
v = fn(req)
|
||||
return v if isinstance(v, list) else [str(v)]
|
||||
except Exception:
|
||||
pass
|
||||
return [req]
|
||||
return []
|
||||
|
||||
|
||||
def cosine(a, b) -> float:
|
||||
if not a or not b:
|
||||
return 0.0
|
||||
dot = sum(x * y for x, y in zip(a, b))
|
||||
na = math.sqrt(sum(x * x for x in a))
|
||||
nb = math.sqrt(sum(y * y for y in b))
|
||||
return dot / (na * nb) if na and nb else 0.0
|
||||
|
||||
|
||||
def greedy_cluster(vecs: list, thr: float) -> list[dict]:
|
||||
"""Single-Pass-Greedy-Clustering: jeder Vektor joint den ersten Cluster, dessen Seed
|
||||
cosine ≥ thr ist, sonst neuer Cluster. Deterministisch (stabile Reihenfolge)."""
|
||||
clusters: list[dict] = []
|
||||
for i, v in enumerate(vecs):
|
||||
if not v:
|
||||
clusters.append({"seed": None, "members": [i]})
|
||||
continue
|
||||
best, best_sim = None, thr
|
||||
for c in clusters:
|
||||
if c["seed"] is None:
|
||||
continue
|
||||
s = cosine(v, c["seed"])
|
||||
if s >= best_sim:
|
||||
best_sim, best = s, c
|
||||
if best:
|
||||
best["members"].append(i)
|
||||
else:
|
||||
clusters.append({"seed": v, "members": [i]})
|
||||
return clusters
|
||||
|
||||
|
||||
def centroid(idxs: list[int], vecs: list) -> Optional[list]:
|
||||
vs = [vecs[i] for i in idxs if vecs[i]]
|
||||
if not vs:
|
||||
return None
|
||||
n = len(vs)
|
||||
return [sum(col) / n for col in zip(*vs)]
|
||||
|
||||
|
||||
def validate_registry(reg: dict) -> dict:
|
||||
"""Belastbarkeits-Checks (User-Regeln): LEGAL_MINIMUM braucht legal_basis,
|
||||
member_controls vollständig, out_of_scope separat, >8-Obligations/Review-Unit-Warnung."""
|
||||
obls = reg.get("obligations", [])
|
||||
lm = [o for o in obls if o.get("tier") == "LEGAL_MINIMUM"]
|
||||
lm_without_basis = [o["id"] for o in lm if not o.get("legal_basis")]
|
||||
empty_members = [o["id"] for o in obls if not o.get("member_controls")]
|
||||
per_unit: dict[str, int] = {}
|
||||
for o in obls:
|
||||
ru = (o.get("provenance") or {}).get("source_meta_cluster")
|
||||
if ru:
|
||||
per_unit[ru] = per_unit.get(ru, 0) + 1
|
||||
over8 = {ru: n for ru, n in per_unit.items() if n > 8}
|
||||
rels = reg.get("relationships", [])
|
||||
return {
|
||||
"obligations": len(obls),
|
||||
"legal_minimum": len(lm),
|
||||
"lm_without_legal_basis": lm_without_basis,
|
||||
"empty_member_controls": empty_members,
|
||||
"over8_per_review_unit": over8,
|
||||
"out_of_scope": sum(1 for r in rels if r.get("type") == "out_of_scope"),
|
||||
"semantic_edges": sum(1 for r in rels if r.get("type") in SEMANTIC_EDGE_TYPES),
|
||||
"passed": not lm_without_basis and not empty_members and not over8,
|
||||
}
|
||||
|
||||
|
||||
def merge_edges(relationships: list[dict], proposed: list[dict]) -> tuple[list[dict], int]:
|
||||
"""Proposed semantische Kanten dedupliziert in relationships mergen. Gibt (merged, added)."""
|
||||
existing = {(r.get("type"), r.get("from"), r.get("to"))
|
||||
for r in relationships if r.get("from")}
|
||||
added = 0
|
||||
out = list(relationships)
|
||||
for e in proposed:
|
||||
if e.get("type") not in SEMANTIC_EDGE_TYPES:
|
||||
continue
|
||||
key = (e["type"], e.get("from"), e.get("to"))
|
||||
if key in existing or not e.get("from") or not e.get("to"):
|
||||
continue
|
||||
out.append(e)
|
||||
existing.add(key)
|
||||
added += 1
|
||||
return out, added
|
||||
@@ -0,0 +1,36 @@
|
||||
"""Stufe 5 — Review-Diff mergen: vorgeschlagene Beziehungskanten (review_status=proposed)
|
||||
dedupliziert in die Registry mergen (kein LLM/Key). Kleine Beziehungs-Sprache:
|
||||
depends_on/supports/produces_evidence_for/implements/derived_from.
|
||||
|
||||
python3 scripts/obligation_discovery/merge_review_diff.py obligations/cra.json /tmp/cra_edges_review.json
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
|
||||
from _core import SEMANTIC_EDGE_TYPES, merge_edges
|
||||
|
||||
|
||||
def main() -> None:
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("registry")
|
||||
ap.add_argument("review_diff")
|
||||
ap.add_argument("--write", action="store_true", help="in die Registry schreiben (sonst dry-run)")
|
||||
a = ap.parse_args()
|
||||
reg = json.load(open(a.registry, encoding="utf-8"))
|
||||
diff = json.load(open(a.review_diff, encoding="utf-8"))
|
||||
proposed = diff.get("proposed_edges", diff if isinstance(diff, list) else [])
|
||||
merged, added = merge_edges(reg.get("relationships", []), proposed)
|
||||
print(f"proposed: {len(proposed)} | added (dedupliziert): {added}")
|
||||
if a.write:
|
||||
reg["relationships"] = merged
|
||||
reg["relationship_types"] = list(SEMANTIC_EDGE_TYPES)
|
||||
json.dump(reg, open(a.registry, "w", encoding="utf-8"), ensure_ascii=False, indent=1)
|
||||
print(f"written: {a.registry}")
|
||||
else:
|
||||
print("dry-run (use --write to apply)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,55 @@
|
||||
"""Stufe 2 — Meta-Cluster (der Skalierungs-Fix für große Domänen): Mikro-Cluster →
|
||||
REVIEW UNITS. Review Unit = das, was der LLM-Synthese-Pass sieht (entkoppelt vom Clustering,
|
||||
später merge/split-bar). Nutzt den Embedding-Cache aus precluster (kein Re-Embed).
|
||||
|
||||
python3 scripts/obligation_discovery/meta_cluster.py --scope auth --meta-thr 0.62
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import pickle
|
||||
|
||||
from _core import centroid, greedy_cluster
|
||||
|
||||
|
||||
def run(scope: str, meta_thr: float, outdir: str) -> None:
|
||||
micro = json.load(open(os.path.join(outdir, f"{scope}_micro_clusters.json"), encoding="utf-8"))
|
||||
vecs = pickle.load(open(os.path.join(outdir, f"{scope}_vecs.pkl"), "rb"))
|
||||
centroids = [centroid(m["member_indices"], vecs) for m in micro]
|
||||
meta = greedy_cluster(centroids, meta_thr)
|
||||
print(f"scope={scope} pass-2 (meta-thr={meta_thr}): {len(micro)} micro → {len(meta)} review-units")
|
||||
|
||||
out = []
|
||||
for mi, m in enumerate(meta):
|
||||
ctrl_ids, titles = [], []
|
||||
for micro_idx in m["members"]:
|
||||
mc = micro[micro_idx]
|
||||
ctrl_ids += mc["control_ids"]
|
||||
titles.append(mc["titles"][0] if mc["titles"] else "")
|
||||
out.append({"review_unit_id": f"M{mi}", "n_micro": len(m["members"]),
|
||||
"n_controls": len(ctrl_ids), "control_ids": ctrl_ids,
|
||||
"sample_titles": titles[:8]})
|
||||
out.sort(key=lambda x: -x["n_controls"])
|
||||
path = os.path.join(outdir, f"{scope}_review_units.json")
|
||||
json.dump(out, open(path, "w", encoding="utf-8"), ensure_ascii=False, indent=1)
|
||||
|
||||
print("=== top review units (inspect for cross-domain mixing BEFORE synthesis) ===")
|
||||
for m in out[:12]:
|
||||
print(f" {m['review_unit_id']:5} ctrl={m['n_controls']:4} micro={m['n_micro']:3} "
|
||||
f"| {' || '.join(t[:30] for t in m['sample_titles'][:3])}")
|
||||
print(f"written: {path} ({len(out)} review units)")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--scope", default="auth")
|
||||
ap.add_argument("--meta-thr", type=float, default=0.62)
|
||||
ap.add_argument("--outdir", default="/tmp")
|
||||
a = ap.parse_args()
|
||||
run(a.scope, a.meta_thr, a.outdir)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,73 @@
|
||||
"""Stufe 1 — Pre-Cluster: Controls (scope) → BGE-M3-Embedding (gecacht) → Mikro-Cluster.
|
||||
Deterministisch. Im bp-compliance-backend-Container ausführen (PYTHONPATH=/app).
|
||||
|
||||
python3 scripts/obligation_discovery/precluster.py --scope sbom
|
||||
python3 scripts/obligation_discovery/precluster.py --patterns '%sbom%,%software bill%' --micro-thr 0.78
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import pickle
|
||||
|
||||
from _core import greedy_cluster, parse_req
|
||||
|
||||
SCOPES = {
|
||||
"sbom": ["%SBOM%", "%software bill%", "%stückliste%", "%komponentenliste%"],
|
||||
"vuln": ["%schwachstellenbehandl%", "%schwachstellenmanagement%", "%vulnerability handling%",
|
||||
"%coordinated vulnerab%", "%vulnerability disclosure%", "%cvd-konzept%"],
|
||||
"auth": ["%authentisierung%", "%authentifizierung%", "%authentication%"],
|
||||
}
|
||||
|
||||
|
||||
async def run(scope: str, patterns: list[str], micro_thr: float, outdir: str) -> None:
|
||||
import asyncpg
|
||||
from compliance.services.mc_embedding_matcher import _embed_texts
|
||||
|
||||
dsn = os.getenv("DATABASE_URL") or os.getenv("COMPLIANCE_DATABASE_URL")
|
||||
conn = await asyncpg.connect(dsn)
|
||||
where = " or ".join(f"title ilike ${i+1}" for i in range(len(patterns)))
|
||||
rows = await conn.fetch(
|
||||
f"select control_id, title, requirements from compliance.canonical_controls "
|
||||
f"where {where} order by control_id", *patterns)
|
||||
await conn.close()
|
||||
items = [{"control_id": r["control_id"], "title": r["title"] or "",
|
||||
"embed_text": (r["title"] or "") + ". " + " ".join(parse_req(r["requirements"])[:2])}
|
||||
for r in rows]
|
||||
print(f"scope={scope}: {len(items)} controls")
|
||||
|
||||
cache = os.path.join(outdir, f"{scope}_vecs.pkl")
|
||||
if os.path.exists(cache):
|
||||
vecs = pickle.load(open(cache, "rb"))
|
||||
print(f"embeddings from cache ({len(vecs)})")
|
||||
else:
|
||||
vecs = await _embed_texts([it["embed_text"] for it in items])
|
||||
pickle.dump(vecs, open(cache, "wb"))
|
||||
print(f"embeddings fresh+cached ({len(vecs)})")
|
||||
|
||||
micro = greedy_cluster(vecs, micro_thr)
|
||||
print(f"pass-1 (micro-thr={micro_thr}): {len(items)} → {len(micro)} micro-clusters")
|
||||
out = [{"micro_id": i, "size": len(c["members"]), "member_indices": c["members"],
|
||||
"control_ids": [items[j]["control_id"] for j in c["members"]],
|
||||
"titles": [items[j]["title"] for j in c["members"][:6]]}
|
||||
for i, c in enumerate(micro)]
|
||||
path = os.path.join(outdir, f"{scope}_micro_clusters.json")
|
||||
json.dump(out, open(path, "w", encoding="utf-8"), ensure_ascii=False, indent=1)
|
||||
print(f"written: {path}")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--scope", default="sbom")
|
||||
ap.add_argument("--patterns", default="", help="comma-separated SQL ILIKE patterns (overrides --scope)")
|
||||
ap.add_argument("--micro-thr", type=float, default=0.78)
|
||||
ap.add_argument("--outdir", default="/tmp")
|
||||
a = ap.parse_args()
|
||||
patterns = [p for p in a.patterns.split(",") if p] or SCOPES[a.scope]
|
||||
asyncio.run(run(a.scope, patterns, a.micro_thr, a.outdir))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,113 @@
|
||||
"""Stufe 3 — LLM-Synthese: REVIEW UNITS → Obligation Registry (Schema obligation_registry_v1).
|
||||
Geschärfter Prompt: kleinste Menge regulatorisch UNTERSCHIEDLICHER Obligations. Harte Tier-
|
||||
Regel in Code erzwungen. Provenance pro Obligation. ANTHROPIC_API_KEY aus ENV (nie hartcodiert).
|
||||
Große Calls → STREAMING (SDK blockt non-streaming >10min).
|
||||
|
||||
ANTHROPIC_API_KEY=… python3 scripts/obligation_discovery/synthesize_obligations.py \
|
||||
--units /tmp/auth_review_units.json --regulation CRA --theme "Authentisierung" --out /tmp/auth_registry.json
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from collections import Counter
|
||||
|
||||
from _core import SEMANTIC_EDGE_TYPES
|
||||
|
||||
SYS = """Du bist Knowledge Engineer und baust eine LEGAL OBLIGATION REGISTRY fuer __REGULATION__
|
||||
(Thema: __THEME__). Input: REVIEW UNITS (algorithmisch vor-gebuendelte Control-Gruppen), jede
|
||||
kann MEHRERE unterschiedliche Pflichten enthalten.
|
||||
|
||||
AUFGABE: Zerlege die Review Units in die KLEINSTE MENGE regulatorisch UNTERSCHIEDLICHER Legal
|
||||
Obligations. Regeln:
|
||||
- Nichts zusammenfuehren nur wegen aehnlicher Woerter.
|
||||
- Unterschiedliche Rechtsgrundlage => unterschiedliche Obligation.
|
||||
- Unterschiedliche Applicability => unterschiedliche Obligation.
|
||||
- Unterschiedliche Evidence-Facette (governance/capability/evidence) => GLEICHE Obligation, andere Facette.
|
||||
- Unterschiedliche Umsetzung (NIST/OWASP/ISO/BSI) => guidance_basis, KEINE neue Obligation.
|
||||
- Gleiche Pflicht ueber mehrere Review Units => EINE Obligation (mehrere member_review_units).
|
||||
|
||||
Gib AUSSCHLIESSLICH JSON aus:
|
||||
{"obligations":[{"id":"snake_case","name":"","description":"","tier":"LEGAL_MINIMUM|BEST_PRACTICE|IMPLEMENTATION_GUIDANCE|EVIDENCE","applicability":"universal|conditional:<pred>|domain:<x>","evidence_facets":{"governance":true,"capability":true,"evidence":false},"source_role":"LEGAL_BASIS|GUIDANCE|EVIDENCE|IMPLEMENTATION","legal_basis":[{"source":"__REGULATION__","anchor":"","citation":""}],"guidance_basis":[{"source":"NIST|OWASP|ISO|BSI","anchor":"","role":"best_practice"}],"subdomain":"","member_review_units":["M0"],"source_meta_cluster":"M0","discovery_confidence":0.9}],
|
||||
"relationships":[{"type":"depends_on|supports|produces_evidence_for|implements|derived_from","from":"id","to":"id","note":""},{"type":"out_of_scope","review_units":["M0"],"note":""}]}
|
||||
|
||||
HARTE REGELN:
|
||||
- tier=LEGAL_MINIMUM NUR mit legal_basis (Primaerrecht). Sonst tier=BEST_PRACTICE, legal_basis=[].
|
||||
- legal_basis NUR Primaerrecht der Regulierung; NIST/OWASP/ISO/BSI => guidance_basis.
|
||||
- relationships SPARSAM, gerichtet, nur klar belegbar.
|
||||
- out_of_scope: Review Units, die NICHT zum Thema gehoeren (andere Regulierung/Domaene)."""
|
||||
|
||||
|
||||
def build_user(units: list[dict]) -> str:
|
||||
lines = []
|
||||
for u in units:
|
||||
t = " | ".join(str(x)[:46] for x in u.get("sample_titles", [])[:6])
|
||||
lines.append(f"{u['review_unit_id']} (controls={u['n_controls']}): {t}")
|
||||
return "Review Units:\n" + "\n".join(lines)
|
||||
|
||||
|
||||
def synthesize(units, regulation, theme, model):
|
||||
import anthropic
|
||||
key = os.environ["ANTHROPIC_API_KEY"]
|
||||
sys = SYS.replace("__REGULATION__", regulation).replace("__THEME__", theme)
|
||||
client = anthropic.Anthropic(api_key=key)
|
||||
with client.messages.stream(model=model, max_tokens=24000, system=sys,
|
||||
messages=[{"role": "user", "content": build_user(units)}]) as st:
|
||||
msg = st.get_final_message()
|
||||
txt = msg.content[0].text
|
||||
m = re.search(r"\{.*\}", txt, re.DOTALL)
|
||||
return json.loads(m.group(0) if m else txt)
|
||||
|
||||
|
||||
def post_process(data, units, regulation, model):
|
||||
cmap = {u["review_unit_id"]: u["control_ids"] for u in units}
|
||||
size = {u["review_unit_id"]: u["n_controls"] for u in units}
|
||||
obls = []
|
||||
for o in data.get("obligations", []):
|
||||
rus = [r for r in (o.get("member_review_units") or []) if r in cmap]
|
||||
members = sorted({c for ru in rus for c in cmap[ru]})
|
||||
lb = o.get("legal_basis") or []
|
||||
tier, review = o.get("tier", "BEST_PRACTICE"), "draft"
|
||||
if tier == "LEGAL_MINIMUM" and not lb:
|
||||
tier, review = "BEST_PRACTICE", "needs_legal_basis"
|
||||
smc = o.get("source_meta_cluster") or (rus[0] if rus else "")
|
||||
obls.append({
|
||||
"id": o["id"], "name": o.get("name", ""), "description": o.get("description", ""),
|
||||
"tier": tier, "subdomain": o.get("subdomain", ""),
|
||||
"applicability": o.get("applicability", "universal"),
|
||||
"evidence_facets": o.get("evidence_facets", {}), "source_role": o.get("source_role", ""),
|
||||
"legal_basis": lb, "guidance_basis": o.get("guidance_basis") or [],
|
||||
"member_review_units": rus, "member_controls": members, "member_count": len(members),
|
||||
"relationships": [], "citation_anchor_ids": [], "citation_status": "pending_span_anchor",
|
||||
"review_status": review,
|
||||
"provenance": {"discovery_confidence": o.get("discovery_confidence"),
|
||||
"source_meta_cluster": smc, "cluster_size": size.get(smc),
|
||||
"llm_model": model, "synthesis_version": "v1"}})
|
||||
rels = [r for r in data.get("relationships", [])
|
||||
if r.get("type") in SEMANTIC_EDGE_TYPES or r.get("type") == "out_of_scope"]
|
||||
return {"schema_version": "obligation_registry_v1", "regulation": regulation,
|
||||
"generated_by": f"obligation_discovery/{model}", "synthesis_version": "v1",
|
||||
"citation_status": "pending_span_anchor", "obligations": obls, "relationships": rels}
|
||||
|
||||
|
||||
def main() -> None:
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--units", required=True)
|
||||
ap.add_argument("--regulation", default="CRA")
|
||||
ap.add_argument("--theme", default="")
|
||||
ap.add_argument("--model", default="claude-opus-4-8")
|
||||
ap.add_argument("--out", required=True)
|
||||
a = ap.parse_args()
|
||||
units = json.load(open(a.units, encoding="utf-8"))
|
||||
data = synthesize(units, a.regulation, a.theme, a.model)
|
||||
reg = post_process(data, units, a.regulation, a.model)
|
||||
json.dump(reg, open(a.out, "w", encoding="utf-8"), ensure_ascii=False, indent=1)
|
||||
o = reg["obligations"]
|
||||
print(f"obligations: {len(o)} | tier: {dict(Counter(x['tier'] for x in o))}")
|
||||
print(f"written: {a.out}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,35 @@
|
||||
"""Stufe 4 — Validierung: belastbare Registry-Checks (kein LLM/Key).
|
||||
Prüft die User-Regeln: LEGAL_MINIMUM braucht legal_basis · member_controls vollständig ·
|
||||
out_of_scope separat · >8-Obligations/Review-Unit-Warnung. Exit-Code 1 bei hartem Fehler.
|
||||
|
||||
python3 scripts/obligation_discovery/validate_registry.py obligations/cra_authentication.json
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
|
||||
from _core import validate_registry
|
||||
|
||||
|
||||
def main() -> None:
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("registry")
|
||||
a = ap.parse_args()
|
||||
reg = json.load(open(a.registry, encoding="utf-8"))
|
||||
v = validate_registry(reg)
|
||||
print(f"=== validate {a.registry} ===")
|
||||
print(f" obligations: {v['obligations']}")
|
||||
print(f" LEGAL_MINIMUM: {v['legal_minimum']}")
|
||||
print(f" LM ohne legal_basis: {v['lm_without_legal_basis'] or 'keine'}")
|
||||
print(f" member_controls leer: {v['empty_member_controls'] or 'keine'}")
|
||||
print(f" >8 Obligations/Review-Unit: {v['over8_per_review_unit'] or 'keine'}")
|
||||
print(f" out_of_scope: {v['out_of_scope']}")
|
||||
print(f" semantische Kanten: {v['semantic_edges']}")
|
||||
print(f" PASSED: {v['passed']}")
|
||||
sys.exit(0 if v["passed"] else 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user