Add obligation discovery pipeline tooling
Sichert die validierte Obligation Discovery Pipeline aus /tmp als dauerhaftes, committetes Tooling (scripts/obligation_discovery/) — der eigentliche Vermögenswert. Stufen: precluster (Embedding-Cache + Mikro-Cluster) → meta_cluster (Review Units, Skalierungs-Fix) → synthesize_obligations (Opus, Key aus ENV, Streaming, harte Tier-Regel, Provenance) → validate_registry → merge_review_diff. Reine Helfer in _core.py, 16 Unit-Tests. Doku docs-src/development/obligation_discovery_pipeline_v1.md mit Meilensteinen (SBOM/Vuln reproduziert, Auth 4408→170 Review Units→54→kuriert 29) und der Architekturregel: Runtime deterministisch, Discovery LLM-gestützt. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,114 @@
|
||||
"""Reine Helfer der Obligation Discovery Pipeline (keine schweren Imports → unit-testbar).
|
||||
|
||||
Die Pipeline leitet aus großen Compliance-Korpora eine regulatorische Ontologie ab:
|
||||
Controls → Mikro-Cluster → Meta-Cluster/Review-Units → LLM-Synthese → Obligation Registry.
|
||||
Architekturregel: RUNTIME bleibt deterministisch; DISCOVERY (dieses Tooling) darf LLM-gestützt
|
||||
sein und läuft EINMALIG/offline. Siehe docs-src/development/obligation_discovery_pipeline_v1.md.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import ast
|
||||
import json
|
||||
import math
|
||||
from typing import Optional
|
||||
|
||||
SEMANTIC_EDGE_TYPES = ("depends_on", "supports", "produces_evidence_for",
|
||||
"implements", "derived_from")
|
||||
|
||||
|
||||
def parse_req(req) -> list:
|
||||
"""requirements-Spalte (JSON ODER Python-Repr ODER String) robust zu Liste."""
|
||||
if isinstance(req, list):
|
||||
return req
|
||||
if isinstance(req, str):
|
||||
for fn in (json.loads, ast.literal_eval):
|
||||
try:
|
||||
v = fn(req)
|
||||
return v if isinstance(v, list) else [str(v)]
|
||||
except Exception:
|
||||
pass
|
||||
return [req]
|
||||
return []
|
||||
|
||||
|
||||
def cosine(a, b) -> float:
|
||||
if not a or not b:
|
||||
return 0.0
|
||||
dot = sum(x * y for x, y in zip(a, b))
|
||||
na = math.sqrt(sum(x * x for x in a))
|
||||
nb = math.sqrt(sum(y * y for y in b))
|
||||
return dot / (na * nb) if na and nb else 0.0
|
||||
|
||||
|
||||
def greedy_cluster(vecs: list, thr: float) -> list[dict]:
|
||||
"""Single-Pass-Greedy-Clustering: jeder Vektor joint den ersten Cluster, dessen Seed
|
||||
cosine ≥ thr ist, sonst neuer Cluster. Deterministisch (stabile Reihenfolge)."""
|
||||
clusters: list[dict] = []
|
||||
for i, v in enumerate(vecs):
|
||||
if not v:
|
||||
clusters.append({"seed": None, "members": [i]})
|
||||
continue
|
||||
best, best_sim = None, thr
|
||||
for c in clusters:
|
||||
if c["seed"] is None:
|
||||
continue
|
||||
s = cosine(v, c["seed"])
|
||||
if s >= best_sim:
|
||||
best_sim, best = s, c
|
||||
if best:
|
||||
best["members"].append(i)
|
||||
else:
|
||||
clusters.append({"seed": v, "members": [i]})
|
||||
return clusters
|
||||
|
||||
|
||||
def centroid(idxs: list[int], vecs: list) -> Optional[list]:
|
||||
vs = [vecs[i] for i in idxs if vecs[i]]
|
||||
if not vs:
|
||||
return None
|
||||
n = len(vs)
|
||||
return [sum(col) / n for col in zip(*vs)]
|
||||
|
||||
|
||||
def validate_registry(reg: dict) -> dict:
|
||||
"""Belastbarkeits-Checks (User-Regeln): LEGAL_MINIMUM braucht legal_basis,
|
||||
member_controls vollständig, out_of_scope separat, >8-Obligations/Review-Unit-Warnung."""
|
||||
obls = reg.get("obligations", [])
|
||||
lm = [o for o in obls if o.get("tier") == "LEGAL_MINIMUM"]
|
||||
lm_without_basis = [o["id"] for o in lm if not o.get("legal_basis")]
|
||||
empty_members = [o["id"] for o in obls if not o.get("member_controls")]
|
||||
per_unit: dict[str, int] = {}
|
||||
for o in obls:
|
||||
ru = (o.get("provenance") or {}).get("source_meta_cluster")
|
||||
if ru:
|
||||
per_unit[ru] = per_unit.get(ru, 0) + 1
|
||||
over8 = {ru: n for ru, n in per_unit.items() if n > 8}
|
||||
rels = reg.get("relationships", [])
|
||||
return {
|
||||
"obligations": len(obls),
|
||||
"legal_minimum": len(lm),
|
||||
"lm_without_legal_basis": lm_without_basis,
|
||||
"empty_member_controls": empty_members,
|
||||
"over8_per_review_unit": over8,
|
||||
"out_of_scope": sum(1 for r in rels if r.get("type") == "out_of_scope"),
|
||||
"semantic_edges": sum(1 for r in rels if r.get("type") in SEMANTIC_EDGE_TYPES),
|
||||
"passed": not lm_without_basis and not empty_members and not over8,
|
||||
}
|
||||
|
||||
|
||||
def merge_edges(relationships: list[dict], proposed: list[dict]) -> tuple[list[dict], int]:
|
||||
"""Proposed semantische Kanten dedupliziert in relationships mergen. Gibt (merged, added)."""
|
||||
existing = {(r.get("type"), r.get("from"), r.get("to"))
|
||||
for r in relationships if r.get("from")}
|
||||
added = 0
|
||||
out = list(relationships)
|
||||
for e in proposed:
|
||||
if e.get("type") not in SEMANTIC_EDGE_TYPES:
|
||||
continue
|
||||
key = (e["type"], e.get("from"), e.get("to"))
|
||||
if key in existing or not e.get("from") or not e.get("to"):
|
||||
continue
|
||||
out.append(e)
|
||||
existing.add(key)
|
||||
added += 1
|
||||
return out, added
|
||||
@@ -0,0 +1,36 @@
|
||||
"""Stufe 5 — Review-Diff mergen: vorgeschlagene Beziehungskanten (review_status=proposed)
|
||||
dedupliziert in die Registry mergen (kein LLM/Key). Kleine Beziehungs-Sprache:
|
||||
depends_on/supports/produces_evidence_for/implements/derived_from.
|
||||
|
||||
python3 scripts/obligation_discovery/merge_review_diff.py obligations/cra.json /tmp/cra_edges_review.json
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
|
||||
from _core import SEMANTIC_EDGE_TYPES, merge_edges
|
||||
|
||||
|
||||
def main() -> None:
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("registry")
|
||||
ap.add_argument("review_diff")
|
||||
ap.add_argument("--write", action="store_true", help="in die Registry schreiben (sonst dry-run)")
|
||||
a = ap.parse_args()
|
||||
reg = json.load(open(a.registry, encoding="utf-8"))
|
||||
diff = json.load(open(a.review_diff, encoding="utf-8"))
|
||||
proposed = diff.get("proposed_edges", diff if isinstance(diff, list) else [])
|
||||
merged, added = merge_edges(reg.get("relationships", []), proposed)
|
||||
print(f"proposed: {len(proposed)} | added (dedupliziert): {added}")
|
||||
if a.write:
|
||||
reg["relationships"] = merged
|
||||
reg["relationship_types"] = list(SEMANTIC_EDGE_TYPES)
|
||||
json.dump(reg, open(a.registry, "w", encoding="utf-8"), ensure_ascii=False, indent=1)
|
||||
print(f"written: {a.registry}")
|
||||
else:
|
||||
print("dry-run (use --write to apply)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,55 @@
|
||||
"""Stufe 2 — Meta-Cluster (der Skalierungs-Fix für große Domänen): Mikro-Cluster →
|
||||
REVIEW UNITS. Review Unit = das, was der LLM-Synthese-Pass sieht (entkoppelt vom Clustering,
|
||||
später merge/split-bar). Nutzt den Embedding-Cache aus precluster (kein Re-Embed).
|
||||
|
||||
python3 scripts/obligation_discovery/meta_cluster.py --scope auth --meta-thr 0.62
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import pickle
|
||||
|
||||
from _core import centroid, greedy_cluster
|
||||
|
||||
|
||||
def run(scope: str, meta_thr: float, outdir: str) -> None:
|
||||
micro = json.load(open(os.path.join(outdir, f"{scope}_micro_clusters.json"), encoding="utf-8"))
|
||||
vecs = pickle.load(open(os.path.join(outdir, f"{scope}_vecs.pkl"), "rb"))
|
||||
centroids = [centroid(m["member_indices"], vecs) for m in micro]
|
||||
meta = greedy_cluster(centroids, meta_thr)
|
||||
print(f"scope={scope} pass-2 (meta-thr={meta_thr}): {len(micro)} micro → {len(meta)} review-units")
|
||||
|
||||
out = []
|
||||
for mi, m in enumerate(meta):
|
||||
ctrl_ids, titles = [], []
|
||||
for micro_idx in m["members"]:
|
||||
mc = micro[micro_idx]
|
||||
ctrl_ids += mc["control_ids"]
|
||||
titles.append(mc["titles"][0] if mc["titles"] else "")
|
||||
out.append({"review_unit_id": f"M{mi}", "n_micro": len(m["members"]),
|
||||
"n_controls": len(ctrl_ids), "control_ids": ctrl_ids,
|
||||
"sample_titles": titles[:8]})
|
||||
out.sort(key=lambda x: -x["n_controls"])
|
||||
path = os.path.join(outdir, f"{scope}_review_units.json")
|
||||
json.dump(out, open(path, "w", encoding="utf-8"), ensure_ascii=False, indent=1)
|
||||
|
||||
print("=== top review units (inspect for cross-domain mixing BEFORE synthesis) ===")
|
||||
for m in out[:12]:
|
||||
print(f" {m['review_unit_id']:5} ctrl={m['n_controls']:4} micro={m['n_micro']:3} "
|
||||
f"| {' || '.join(t[:30] for t in m['sample_titles'][:3])}")
|
||||
print(f"written: {path} ({len(out)} review units)")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--scope", default="auth")
|
||||
ap.add_argument("--meta-thr", type=float, default=0.62)
|
||||
ap.add_argument("--outdir", default="/tmp")
|
||||
a = ap.parse_args()
|
||||
run(a.scope, a.meta_thr, a.outdir)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,73 @@
|
||||
"""Stufe 1 — Pre-Cluster: Controls (scope) → BGE-M3-Embedding (gecacht) → Mikro-Cluster.
|
||||
Deterministisch. Im bp-compliance-backend-Container ausführen (PYTHONPATH=/app).
|
||||
|
||||
python3 scripts/obligation_discovery/precluster.py --scope sbom
|
||||
python3 scripts/obligation_discovery/precluster.py --patterns '%sbom%,%software bill%' --micro-thr 0.78
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import pickle
|
||||
|
||||
from _core import greedy_cluster, parse_req
|
||||
|
||||
SCOPES = {
|
||||
"sbom": ["%SBOM%", "%software bill%", "%stückliste%", "%komponentenliste%"],
|
||||
"vuln": ["%schwachstellenbehandl%", "%schwachstellenmanagement%", "%vulnerability handling%",
|
||||
"%coordinated vulnerab%", "%vulnerability disclosure%", "%cvd-konzept%"],
|
||||
"auth": ["%authentisierung%", "%authentifizierung%", "%authentication%"],
|
||||
}
|
||||
|
||||
|
||||
async def run(scope: str, patterns: list[str], micro_thr: float, outdir: str) -> None:
|
||||
import asyncpg
|
||||
from compliance.services.mc_embedding_matcher import _embed_texts
|
||||
|
||||
dsn = os.getenv("DATABASE_URL") or os.getenv("COMPLIANCE_DATABASE_URL")
|
||||
conn = await asyncpg.connect(dsn)
|
||||
where = " or ".join(f"title ilike ${i+1}" for i in range(len(patterns)))
|
||||
rows = await conn.fetch(
|
||||
f"select control_id, title, requirements from compliance.canonical_controls "
|
||||
f"where {where} order by control_id", *patterns)
|
||||
await conn.close()
|
||||
items = [{"control_id": r["control_id"], "title": r["title"] or "",
|
||||
"embed_text": (r["title"] or "") + ". " + " ".join(parse_req(r["requirements"])[:2])}
|
||||
for r in rows]
|
||||
print(f"scope={scope}: {len(items)} controls")
|
||||
|
||||
cache = os.path.join(outdir, f"{scope}_vecs.pkl")
|
||||
if os.path.exists(cache):
|
||||
vecs = pickle.load(open(cache, "rb"))
|
||||
print(f"embeddings from cache ({len(vecs)})")
|
||||
else:
|
||||
vecs = await _embed_texts([it["embed_text"] for it in items])
|
||||
pickle.dump(vecs, open(cache, "wb"))
|
||||
print(f"embeddings fresh+cached ({len(vecs)})")
|
||||
|
||||
micro = greedy_cluster(vecs, micro_thr)
|
||||
print(f"pass-1 (micro-thr={micro_thr}): {len(items)} → {len(micro)} micro-clusters")
|
||||
out = [{"micro_id": i, "size": len(c["members"]), "member_indices": c["members"],
|
||||
"control_ids": [items[j]["control_id"] for j in c["members"]],
|
||||
"titles": [items[j]["title"] for j in c["members"][:6]]}
|
||||
for i, c in enumerate(micro)]
|
||||
path = os.path.join(outdir, f"{scope}_micro_clusters.json")
|
||||
json.dump(out, open(path, "w", encoding="utf-8"), ensure_ascii=False, indent=1)
|
||||
print(f"written: {path}")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--scope", default="sbom")
|
||||
ap.add_argument("--patterns", default="", help="comma-separated SQL ILIKE patterns (overrides --scope)")
|
||||
ap.add_argument("--micro-thr", type=float, default=0.78)
|
||||
ap.add_argument("--outdir", default="/tmp")
|
||||
a = ap.parse_args()
|
||||
patterns = [p for p in a.patterns.split(",") if p] or SCOPES[a.scope]
|
||||
asyncio.run(run(a.scope, patterns, a.micro_thr, a.outdir))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,113 @@
|
||||
"""Stufe 3 — LLM-Synthese: REVIEW UNITS → Obligation Registry (Schema obligation_registry_v1).
|
||||
Geschärfter Prompt: kleinste Menge regulatorisch UNTERSCHIEDLICHER Obligations. Harte Tier-
|
||||
Regel in Code erzwungen. Provenance pro Obligation. ANTHROPIC_API_KEY aus ENV (nie hartcodiert).
|
||||
Große Calls → STREAMING (SDK blockt non-streaming >10min).
|
||||
|
||||
ANTHROPIC_API_KEY=… python3 scripts/obligation_discovery/synthesize_obligations.py \
|
||||
--units /tmp/auth_review_units.json --regulation CRA --theme "Authentisierung" --out /tmp/auth_registry.json
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from collections import Counter
|
||||
|
||||
from _core import SEMANTIC_EDGE_TYPES
|
||||
|
||||
SYS = """Du bist Knowledge Engineer und baust eine LEGAL OBLIGATION REGISTRY fuer __REGULATION__
|
||||
(Thema: __THEME__). Input: REVIEW UNITS (algorithmisch vor-gebuendelte Control-Gruppen), jede
|
||||
kann MEHRERE unterschiedliche Pflichten enthalten.
|
||||
|
||||
AUFGABE: Zerlege die Review Units in die KLEINSTE MENGE regulatorisch UNTERSCHIEDLICHER Legal
|
||||
Obligations. Regeln:
|
||||
- Nichts zusammenfuehren nur wegen aehnlicher Woerter.
|
||||
- Unterschiedliche Rechtsgrundlage => unterschiedliche Obligation.
|
||||
- Unterschiedliche Applicability => unterschiedliche Obligation.
|
||||
- Unterschiedliche Evidence-Facette (governance/capability/evidence) => GLEICHE Obligation, andere Facette.
|
||||
- Unterschiedliche Umsetzung (NIST/OWASP/ISO/BSI) => guidance_basis, KEINE neue Obligation.
|
||||
- Gleiche Pflicht ueber mehrere Review Units => EINE Obligation (mehrere member_review_units).
|
||||
|
||||
Gib AUSSCHLIESSLICH JSON aus:
|
||||
{"obligations":[{"id":"snake_case","name":"","description":"","tier":"LEGAL_MINIMUM|BEST_PRACTICE|IMPLEMENTATION_GUIDANCE|EVIDENCE","applicability":"universal|conditional:<pred>|domain:<x>","evidence_facets":{"governance":true,"capability":true,"evidence":false},"source_role":"LEGAL_BASIS|GUIDANCE|EVIDENCE|IMPLEMENTATION","legal_basis":[{"source":"__REGULATION__","anchor":"","citation":""}],"guidance_basis":[{"source":"NIST|OWASP|ISO|BSI","anchor":"","role":"best_practice"}],"subdomain":"","member_review_units":["M0"],"source_meta_cluster":"M0","discovery_confidence":0.9}],
|
||||
"relationships":[{"type":"depends_on|supports|produces_evidence_for|implements|derived_from","from":"id","to":"id","note":""},{"type":"out_of_scope","review_units":["M0"],"note":""}]}
|
||||
|
||||
HARTE REGELN:
|
||||
- tier=LEGAL_MINIMUM NUR mit legal_basis (Primaerrecht). Sonst tier=BEST_PRACTICE, legal_basis=[].
|
||||
- legal_basis NUR Primaerrecht der Regulierung; NIST/OWASP/ISO/BSI => guidance_basis.
|
||||
- relationships SPARSAM, gerichtet, nur klar belegbar.
|
||||
- out_of_scope: Review Units, die NICHT zum Thema gehoeren (andere Regulierung/Domaene)."""
|
||||
|
||||
|
||||
def build_user(units: list[dict]) -> str:
|
||||
lines = []
|
||||
for u in units:
|
||||
t = " | ".join(str(x)[:46] for x in u.get("sample_titles", [])[:6])
|
||||
lines.append(f"{u['review_unit_id']} (controls={u['n_controls']}): {t}")
|
||||
return "Review Units:\n" + "\n".join(lines)
|
||||
|
||||
|
||||
def synthesize(units, regulation, theme, model):
|
||||
import anthropic
|
||||
key = os.environ["ANTHROPIC_API_KEY"]
|
||||
sys = SYS.replace("__REGULATION__", regulation).replace("__THEME__", theme)
|
||||
client = anthropic.Anthropic(api_key=key)
|
||||
with client.messages.stream(model=model, max_tokens=24000, system=sys,
|
||||
messages=[{"role": "user", "content": build_user(units)}]) as st:
|
||||
msg = st.get_final_message()
|
||||
txt = msg.content[0].text
|
||||
m = re.search(r"\{.*\}", txt, re.DOTALL)
|
||||
return json.loads(m.group(0) if m else txt)
|
||||
|
||||
|
||||
def post_process(data, units, regulation, model):
|
||||
cmap = {u["review_unit_id"]: u["control_ids"] for u in units}
|
||||
size = {u["review_unit_id"]: u["n_controls"] for u in units}
|
||||
obls = []
|
||||
for o in data.get("obligations", []):
|
||||
rus = [r for r in (o.get("member_review_units") or []) if r in cmap]
|
||||
members = sorted({c for ru in rus for c in cmap[ru]})
|
||||
lb = o.get("legal_basis") or []
|
||||
tier, review = o.get("tier", "BEST_PRACTICE"), "draft"
|
||||
if tier == "LEGAL_MINIMUM" and not lb:
|
||||
tier, review = "BEST_PRACTICE", "needs_legal_basis"
|
||||
smc = o.get("source_meta_cluster") or (rus[0] if rus else "")
|
||||
obls.append({
|
||||
"id": o["id"], "name": o.get("name", ""), "description": o.get("description", ""),
|
||||
"tier": tier, "subdomain": o.get("subdomain", ""),
|
||||
"applicability": o.get("applicability", "universal"),
|
||||
"evidence_facets": o.get("evidence_facets", {}), "source_role": o.get("source_role", ""),
|
||||
"legal_basis": lb, "guidance_basis": o.get("guidance_basis") or [],
|
||||
"member_review_units": rus, "member_controls": members, "member_count": len(members),
|
||||
"relationships": [], "citation_anchor_ids": [], "citation_status": "pending_span_anchor",
|
||||
"review_status": review,
|
||||
"provenance": {"discovery_confidence": o.get("discovery_confidence"),
|
||||
"source_meta_cluster": smc, "cluster_size": size.get(smc),
|
||||
"llm_model": model, "synthesis_version": "v1"}})
|
||||
rels = [r for r in data.get("relationships", [])
|
||||
if r.get("type") in SEMANTIC_EDGE_TYPES or r.get("type") == "out_of_scope"]
|
||||
return {"schema_version": "obligation_registry_v1", "regulation": regulation,
|
||||
"generated_by": f"obligation_discovery/{model}", "synthesis_version": "v1",
|
||||
"citation_status": "pending_span_anchor", "obligations": obls, "relationships": rels}
|
||||
|
||||
|
||||
def main() -> None:
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--units", required=True)
|
||||
ap.add_argument("--regulation", default="CRA")
|
||||
ap.add_argument("--theme", default="")
|
||||
ap.add_argument("--model", default="claude-opus-4-8")
|
||||
ap.add_argument("--out", required=True)
|
||||
a = ap.parse_args()
|
||||
units = json.load(open(a.units, encoding="utf-8"))
|
||||
data = synthesize(units, a.regulation, a.theme, a.model)
|
||||
reg = post_process(data, units, a.regulation, a.model)
|
||||
json.dump(reg, open(a.out, "w", encoding="utf-8"), ensure_ascii=False, indent=1)
|
||||
o = reg["obligations"]
|
||||
print(f"obligations: {len(o)} | tier: {dict(Counter(x['tier'] for x in o))}")
|
||||
print(f"written: {a.out}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,35 @@
|
||||
"""Stufe 4 — Validierung: belastbare Registry-Checks (kein LLM/Key).
|
||||
Prüft die User-Regeln: LEGAL_MINIMUM braucht legal_basis · member_controls vollständig ·
|
||||
out_of_scope separat · >8-Obligations/Review-Unit-Warnung. Exit-Code 1 bei hartem Fehler.
|
||||
|
||||
python3 scripts/obligation_discovery/validate_registry.py obligations/cra_authentication.json
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
|
||||
from _core import validate_registry
|
||||
|
||||
|
||||
def main() -> None:
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("registry")
|
||||
a = ap.parse_args()
|
||||
reg = json.load(open(a.registry, encoding="utf-8"))
|
||||
v = validate_registry(reg)
|
||||
print(f"=== validate {a.registry} ===")
|
||||
print(f" obligations: {v['obligations']}")
|
||||
print(f" LEGAL_MINIMUM: {v['legal_minimum']}")
|
||||
print(f" LM ohne legal_basis: {v['lm_without_legal_basis'] or 'keine'}")
|
||||
print(f" member_controls leer: {v['empty_member_controls'] or 'keine'}")
|
||||
print(f" >8 Obligations/Review-Unit: {v['over8_per_review_unit'] or 'keine'}")
|
||||
print(f" out_of_scope: {v['out_of_scope']}")
|
||||
print(f" semantische Kanten: {v['semantic_edges']}")
|
||||
print(f" PASSED: {v['passed']}")
|
||||
sys.exit(0 if v["passed"] else 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user