docs(licenses): freeze 3-rule license mapping + audit script
Defines the authoritative mapping from license_type to license_rule in docs/LICENSE_RULES.md, and adds scripts/audit_license_classification.py to surface classification gaps in registry/canonical_controls/Qdrant. Key finding from first audit run against bp-core-postgres + Qdrant: - regulation_registry: 232 rows, 224 rule=1, 8 rule=2, 0 rule=3; 36 rows without license_type (need backfill) - canonical_controls: 314,811 rows, 279,384 (89%) have NULL license_rule (target of Task #22 reclassification) - Qdrant atomic_controls_dedup: 100% of sampled points lack both license and license_rule payload fields - Qdrant bp_compliance_gesetze: 80.6% lack both fields - Qdrant bp_compliance_ce + bp_compliance: nearly clean Rule definitions clarified (was loosely remembered as "law / cite / rewrite"): - Rule 1 = verbatim, sovereign law (EU/DE/AT/CH/US, TRBS/TRGS/ASR, OSHA, NIST, EU guidelines, DGUV UVV) - Rule 2 = verbatim with attribution (CC-BY, Apache, OWASP, OECD AI Principles, ENISA) - Rule 3 = identifier citation only, no full text (DIN/EN/ISO, ANSI/UL/IEC, DGUV Regeln/Informationen/Grundsaetze, BSI, proprietary standards). Pipeline drops chunk_text when rule=3 in pipeline_adapter.py:147. The 4th category I had proposed ("R1-A") turned out to be already implemented as rule=2; the mapping doc reflects the actual code behaviour rather than the original 3-name verbal model. No schema change. No data migration in this commit — reclassification of the 279k controls is staged as Task #22 and will be cluster-based by source/regulation_id.
This commit is contained in:
@@ -0,0 +1,83 @@
|
||||
# Lizenzregeln der Control-Pipeline
|
||||
|
||||
> **Stand:** 2026-05-21 — Mapping festgezurrt nach DB-Inspektion und IACE-Audit.
|
||||
>
|
||||
> Die Pipeline klassifiziert jede Regulation (und damit jedes daraus extrahierte
|
||||
> Chunk und jeden atomic_control) in eine von **drei Lizenzregeln**. Die Regel
|
||||
> entscheidet, ob der Volltext aufbewahrt werden darf und welche Attribution im
|
||||
> Ausgabe-Renderer Pflicht ist.
|
||||
|
||||
## Die drei Regeln
|
||||
|
||||
| Regel | Bedeutung | Volltext speichern? | Attribution Pflicht? | Beispiele |
|
||||
|-------|-----------|---------------------|----------------------|-----------|
|
||||
| **1** | Wörtlich — Hoheitsrecht / Public Domain | ✓ | nein (empfohlen für Audit) | EU-Recht (EUR-Lex), Bundesrecht, Satzungsrecht (DGUV UVV), TRBS, TRGS, ASR, US Federal Code (OSHA), NIST SP, EU-Leitfäden |
|
||||
| **2** | Wörtlich mit Attribution — freie Lizenzen | ✓ | **ja** | OWASP (CC-BY-SA-4.0), OECD AI Principles (OECD_PUBLIC), ENISA-Dokumente (CC-BY-4.0), Apache-2.0 Werke |
|
||||
| **3** | Nur zitieren — proprietäre Standards | ✗ | nicht anwendbar (kein Volltext) | DIN, EN, ISO, ANSI, UL, IEC, IEEE, DGUV Regeln/Informationen/Grundsätze, Bitkom-Leitfäden, BSI-Bausteine (urheberrechtlich) |
|
||||
|
||||
**Wichtige Klarstellung:** Regel 3 = "nur Identifier/Abschnitt zitieren", **nicht** "umformulieren". Die ursprüngliche Bezeichnung "neu formulieren" war irreführend. Korrekt: Bei Regel-3-Quellen darf die Pipeline den Volltext nicht speichern; sie bewahrt nur die Quellenreferenz (regulation_id + article/paragraph), und der Output-Renderer zeigt diese Referenz im Frontend/PDF.
|
||||
|
||||
## Mapping `license_type` → `license_rule`
|
||||
|
||||
| license_type | license_rule | Erklärung |
|
||||
|---|---|---|
|
||||
| `EU_LAW`, `EU_PUBLIC` | 1 | EU-Verordnungen, Richtlinien, OJ-Veröffentlichungen, EU-Leitfäden |
|
||||
| `DE_LAW`, `DE_PUBLIC` | 1 | Bundesgesetze, TRBS, TRGS, ASR, DGUV-UVV (Satzungsrecht) |
|
||||
| `AT_LAW`, `CH_LAW`, `FR_LAW`, `IT_LAW`, `ES_LAW`, `NL_LAW`, `HU_LAW` | 1 | Andere EU-Mitgliedsstaaten-Recht |
|
||||
| `US_GOV_PUBLIC`, `NIST_PUBLIC_DOMAIN`, `OSHA_PUBLIC` | 1 | US Federal Code (17 U.S.C. §105 Public Domain) |
|
||||
| `CC-BY-4.0`, `CC-BY-SA-4.0`, `CC-BY-3.0`, `CC-BY-SA-3.0` | 2 | Creative-Commons mit Attribution-Pflicht |
|
||||
| `Apache-2.0`, `MIT` | 2 | Permissive OSS-Lizenzen, NOTICE-Pflicht |
|
||||
| `OECD_PUBLIC`, `ENISA_CC_BY_4.0` | 2 | Behörden-Publikationen mit Attribution-Auflage |
|
||||
| `DIN_COPYRIGHT`, `ISO_COPYRIGHT`, `ANSI_COPYRIGHT`, `UL_COPYRIGHT`, `IEC_COPYRIGHT` | 3 | Normungsorganisationen — nur Identifier-Zitat |
|
||||
| `DGUV_COPYRIGHT` | 3 | DGUV Regeln/Informationen/Grundsätze (nicht UVV) |
|
||||
| `BITKOM_COPYRIGHT`, `BSI_COPYRIGHT`, `VDMA_COPYRIGHT` | 3 | Verbands-/Behörden-Publikationen mit eigenständigem Urheberrecht |
|
||||
| `OWN_WORK` | 3 | BreakPilot-Eigentexte (Templates, eigene Patterns) — kein externes Lizenzrisiko, aber auch kein Public-Domain-Status |
|
||||
|
||||
**Sonderfall DGUV:** Die Klasse trennt sich nach Publikationstyp:
|
||||
- DGUV **Vorschriften / UVV** → `DE_LAW` → Regel 1
|
||||
- DGUV **Regeln, Informationen, Grundsätze** → `DGUV_COPYRIGHT` → Regel 3
|
||||
|
||||
## Auswirkung pro Pipeline-Stage
|
||||
|
||||
| Stage | Verhalten bei Regel 1 | Regel 2 | Regel 3 |
|
||||
|---|---|---|---|
|
||||
| Stage 6 ControlCompose (`pipeline_adapter.py:147`) | speichert `chunk_text` | speichert `chunk_text` | speichert `chunk_text = None` |
|
||||
| Atomic-Control-Bildung | Volltext als Quelle | Volltext + Attribution-Vermerk | nur regulation_id + article |
|
||||
| Output-Renderer (Frontend/PDF) | optionaler Quellen-Hinweis | **Pflicht-Attribution in Footer + Inline** | nur Identifier rendern |
|
||||
| Tech-File-Anhang | Quelle nennen | Quelle + Lizenz-URL | Identifier-Liste |
|
||||
|
||||
## Quellen ohne Klassifikation
|
||||
|
||||
Aktuell sind in `regulation_registry` **232 Regulationen** klassifiziert (Stand 2026-05-21). Die folgenden müssen noch ergänzt werden (Task #20 deckt den DGUV-Ingest):
|
||||
|
||||
| Quelle | Regel | Begründung |
|
||||
|---|---|---|
|
||||
| TRBS-Familie (24 PDFs im RAG) | 1 | Technische Regeln Betriebssicherheit — BAuA Bundesarbeitsblatt |
|
||||
| TRGS-Familie (alle Volltext-Chunks) | 1 | Technische Regeln Gefahrstoffe — BAuA |
|
||||
| ASR-Familie (17 PDFs) | 1 | Arbeitsstättenregeln — BAuA |
|
||||
| OSHA 29 CFR 1910 Subpart O + Technical Manual | 1 | US Federal Public Domain (17 U.S.C. §105) |
|
||||
| DGUV Vorschrift 1 + UVV-Familie (sobald ingest) | 1 | Satzungsrecht der BG |
|
||||
| DGUV Regel 100-500 + Information 209-072/074/073 | 3 | DGUV-Copyright, nur Identifier |
|
||||
| DIN-Identifier-Tabelle (ohne Volltext) | 3 | DIN-Beuth-Copyright |
|
||||
| ANSI B11.0 + RIA R15.06 + UL 508A Identifier | 3 | ANSI/UL-Copyright |
|
||||
| ISO 12100/13849/13857 Identifier | 3 | ISO-Copyright |
|
||||
|
||||
## Audit-Pflicht
|
||||
|
||||
Vor jedem Ingest neuer Quellen:
|
||||
1. Lizenz prüfen (publikationen.dguv.de, EUR-Lex, etc.)
|
||||
2. license_type aus obiger Tabelle wählen — wenn nicht vorhanden, hier ergänzen
|
||||
3. license_rule wird daraus deterministisch abgeleitet
|
||||
4. Attribution-Text bei Regel 2 ist Pflichtfeld
|
||||
|
||||
Vor jedem Output:
|
||||
- Wenn ein atomic_control aus einer Regel-3-Quelle stammt: prüfen dass NUR Identifier gezeigt wird, niemals Volltext
|
||||
- Wenn aus Regel-2-Quelle: Attribution muss im PDF-Footer und im Frontend-Tooltip vorhanden sein
|
||||
- Wenn aus Regel-1-Quelle: empfohlen Quelle nennen für Auditierbarkeit
|
||||
|
||||
## Verweise
|
||||
|
||||
- Schema: `migrations/002_regulation_registry.sql`
|
||||
- Code: `services/regulation_registry.py`, `services/pipeline_adapter.py`
|
||||
- Seed-Script: `scripts/f1_migrate_regulation_registry.py`
|
||||
- Tests: `tests/test_regulation_registry.py` (assert: rule IN (1,2,3))
|
||||
@@ -0,0 +1,256 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Audit script for license classification gaps in the control pipeline.
|
||||
|
||||
Reports:
|
||||
|
||||
1. **regulation_registry coverage** — how many regulations are classified, by
|
||||
rule and license_type.
|
||||
2. **atomic_controls without license_rule** — how many controls reference a
|
||||
regulation_id that has no entry (or no license_rule) in the registry.
|
||||
3. **Qdrant payload consistency** — for each indexed collection, how many
|
||||
chunks carry both ``license`` and ``license_rule`` payload fields.
|
||||
|
||||
The goal is to surface every record where the engine could in principle
|
||||
extract or emit content but the license rule is unknown — those records are
|
||||
the highest-risk material in a license audit.
|
||||
|
||||
Usage::
|
||||
|
||||
python3 scripts/audit_license_classification.py --db-host 100.80.114.48
|
||||
|
||||
Add ``--check-qdrant`` to also probe ``http://<host>:6333`` collections.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
from collections import Counter
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from urllib import request as urllib_request
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
DEFAULT_HOST = "100.80.114.48"
|
||||
DEFAULT_PORT = 5432
|
||||
DEFAULT_USER = "breakpilot"
|
||||
DEFAULT_DB = "breakpilot_db"
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(description=__doc__)
|
||||
p.add_argument("--db-host", default=DEFAULT_HOST)
|
||||
p.add_argument("--db-port", type=int, default=DEFAULT_PORT)
|
||||
p.add_argument("--db-user", default=DEFAULT_USER)
|
||||
p.add_argument("--db-name", default=DEFAULT_DB)
|
||||
p.add_argument("--db-password", default="")
|
||||
p.add_argument("--check-qdrant", action="store_true")
|
||||
p.add_argument("--qdrant-host", default="100.80.114.48")
|
||||
p.add_argument("--qdrant-port", type=int, default=6333)
|
||||
p.add_argument("--json", action="store_true", help="Emit JSON result on stdout")
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def audit_registry(conn) -> dict:
|
||||
"""Coverage of regulation_registry."""
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"SET search_path TO compliance, public; "
|
||||
"SELECT license_rule, license_type, COUNT(*) "
|
||||
"FROM regulation_registry GROUP BY license_rule, license_type "
|
||||
"ORDER BY license_rule, license_type;"
|
||||
)
|
||||
by_rule_and_type: list[tuple] = []
|
||||
by_rule: Counter = Counter()
|
||||
for rule, ltype, count in cur.fetchall():
|
||||
by_rule_and_type.append((rule, ltype or "(empty)", count))
|
||||
by_rule[rule] += count
|
||||
|
||||
cur.execute(
|
||||
"SELECT COUNT(*) FROM regulation_registry "
|
||||
"WHERE license_type IS NULL OR license_type = '';"
|
||||
)
|
||||
missing_type = cur.fetchone()[0]
|
||||
|
||||
cur.execute("SELECT COUNT(*) FROM regulation_registry;")
|
||||
total = cur.fetchone()[0]
|
||||
|
||||
return {
|
||||
"total": total,
|
||||
"by_rule": dict(by_rule),
|
||||
"by_rule_and_type": by_rule_and_type,
|
||||
"missing_license_type": missing_type,
|
||||
}
|
||||
|
||||
|
||||
def audit_atomic_controls(conn) -> dict:
|
||||
"""Controls whose source regulation has no license rule.
|
||||
|
||||
Important: the schema differs between core (bp-core) and customer
|
||||
deployments. We probe a handful of likely column names and skip if
|
||||
none are found.
|
||||
"""
|
||||
cur = conn.cursor()
|
||||
# Detect controls table
|
||||
cur.execute(
|
||||
"SELECT table_name FROM information_schema.tables "
|
||||
"WHERE table_schema='compliance' AND table_name IN "
|
||||
"('atomic_controls','atomic_controls_dedup','canonical_controls');"
|
||||
)
|
||||
tables = [r[0] for r in cur.fetchall()]
|
||||
if not tables:
|
||||
return {"skipped": True, "reason": "no controls table found"}
|
||||
|
||||
result: dict = {"tables": {}}
|
||||
for tbl in tables:
|
||||
cur.execute(
|
||||
f"SELECT column_name FROM information_schema.columns "
|
||||
f"WHERE table_schema='compliance' AND table_name='{tbl}';"
|
||||
)
|
||||
cols = {r[0] for r in cur.fetchall()}
|
||||
if "license_rule" not in cols:
|
||||
result["tables"][tbl] = {"skipped": True, "reason": "no license_rule column"}
|
||||
continue
|
||||
cur.execute(f"SELECT COUNT(*) FROM compliance.{tbl};")
|
||||
total = cur.fetchone()[0]
|
||||
cur.execute(
|
||||
f"SELECT license_rule, COUNT(*) FROM compliance.{tbl} "
|
||||
f"GROUP BY license_rule ORDER BY license_rule;"
|
||||
)
|
||||
by_rule = {str(r[0]): r[1] for r in cur.fetchall()}
|
||||
cur.execute(
|
||||
f"SELECT COUNT(*) FROM compliance.{tbl} WHERE license_rule IS NULL;"
|
||||
)
|
||||
missing = cur.fetchone()[0]
|
||||
result["tables"][tbl] = {
|
||||
"total": total,
|
||||
"by_rule": by_rule,
|
||||
"missing_license_rule": missing,
|
||||
}
|
||||
return result
|
||||
|
||||
|
||||
def audit_qdrant(host: str, port: int) -> dict:
|
||||
"""Probe Qdrant collections for license + license_rule payload coverage.
|
||||
|
||||
Samples 500 points per collection and reports how many have neither
|
||||
field populated.
|
||||
"""
|
||||
out: dict = {"collections": {}}
|
||||
base = f"http://{host}:{port}"
|
||||
try:
|
||||
with urllib_request.urlopen(f"{base}/collections", timeout=10) as r:
|
||||
colls = json.loads(r.read()).get("result", {}).get("collections", [])
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
for c in colls:
|
||||
name = c["name"]
|
||||
if "compliance" not in name and "atomic_controls" not in name:
|
||||
continue
|
||||
payload = {"limit": 500, "with_payload": True, "with_vector": False}
|
||||
req = urllib_request.Request(
|
||||
f"{base}/collections/{name}/points/scroll",
|
||||
data=json.dumps(payload).encode(),
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
try:
|
||||
with urllib_request.urlopen(req, timeout=15) as r:
|
||||
points = json.loads(r.read()).get("result", {}).get("points", [])
|
||||
except Exception as e:
|
||||
out["collections"][name] = {"error": str(e)}
|
||||
continue
|
||||
sampled = len(points)
|
||||
both_set = 0
|
||||
only_license = 0
|
||||
only_rule = 0
|
||||
neither = 0
|
||||
for p in points:
|
||||
pl = p.get("payload", {}) or {}
|
||||
has_lic = bool(pl.get("license"))
|
||||
has_rule = pl.get("license_rule") is not None
|
||||
if has_lic and has_rule:
|
||||
both_set += 1
|
||||
elif has_lic:
|
||||
only_license += 1
|
||||
elif has_rule:
|
||||
only_rule += 1
|
||||
else:
|
||||
neither += 1
|
||||
out["collections"][name] = {
|
||||
"sampled": sampled,
|
||||
"both_set": both_set,
|
||||
"only_license_field": only_license,
|
||||
"only_license_rule_field": only_rule,
|
||||
"neither_set": neither,
|
||||
"neither_pct": round(neither / sampled * 100, 1) if sampled else 0,
|
||||
}
|
||||
return out
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
try:
|
||||
import psycopg2
|
||||
except ImportError:
|
||||
print("error: psycopg2 not installed (pip install psycopg2-binary)", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
conn = psycopg2.connect(
|
||||
host=args.db_host,
|
||||
port=args.db_port,
|
||||
user=args.db_user,
|
||||
dbname=args.db_name,
|
||||
password=args.db_password or None,
|
||||
)
|
||||
try:
|
||||
registry = audit_registry(conn)
|
||||
controls = audit_atomic_controls(conn)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
qdrant: Optional[dict] = None
|
||||
if args.check_qdrant:
|
||||
qdrant = audit_qdrant(args.qdrant_host, args.qdrant_port)
|
||||
|
||||
result = {"registry": registry, "atomic_controls": controls, "qdrant": qdrant}
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(result, indent=2, default=str))
|
||||
return 0
|
||||
|
||||
print("=" * 60)
|
||||
print(" Audit — License Classification")
|
||||
print("=" * 60)
|
||||
print()
|
||||
print(f"## regulation_registry ({registry['total']} rows)")
|
||||
print(f" By rule: {registry['by_rule']}")
|
||||
print(f" Missing license_type: {registry['missing_license_type']}")
|
||||
print()
|
||||
print("## atomic_controls")
|
||||
for tbl, info in controls.get("tables", {}).items():
|
||||
if info.get("skipped"):
|
||||
print(f" {tbl}: SKIPPED ({info['reason']})")
|
||||
continue
|
||||
print(f" {tbl}: {info['total']} rows")
|
||||
print(f" by_rule={info['by_rule']}")
|
||||
print(f" missing_license_rule={info['missing_license_rule']}")
|
||||
print()
|
||||
if qdrant:
|
||||
print("## qdrant")
|
||||
for name, info in qdrant.get("collections", {}).items():
|
||||
if "error" in info:
|
||||
print(f" {name}: ERROR {info['error']}")
|
||||
continue
|
||||
print(
|
||||
f" {name:30} sampled={info['sampled']:4} "
|
||||
f"both={info['both_set']:4} "
|
||||
f"neither={info['neither_set']:4} ({info['neither_pct']}%)"
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user