feat(iace): overnight NASA NTRS failure-knowledge harvester

Unattended, bounded (MAX_DOCS), resumable pipeline: NTRS lessons-learned →
public-reuse licence gate → download PDF → pdftotext / abstract / vision-OCR
fallback → Ollama tuple extraction → results.jsonl + bp_iace_failure_kb,
tagged verified=false + provenance for morning review. Never touches the
curated Go set.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-13 00:36:44 +02:00
parent d27c1b9e7d
commit 0148d55304
@@ -0,0 +1,224 @@
#!/usr/bin/env python3
"""Unattended overnight harvester: NASA NTRS lessons-learned -> FailureKnowledge.
Bounded, resumable, logged. Pure-public-reuse licence gate (mirrors Go NTRSUsable).
Extraction priority per doc: pdftotext -> abstract (always available) -> vision OCR
(scanned only). Each tuple is LLM-extracted (Ollama), tagged verified=false +
provenance, written to results.jsonl AND ingested into bp_iace_failure_kb for
morning review/promotion. It never touches the curated Go set.
Env: MAX_DOCS (default 400), OLLAMA_TEXT (llama3.2:latest), OLLAMA_VISION (qwen2.5vl:32b).
Run: nohup python3 fmea_nightly_harvest.py > /tmp/fmea_harvest/run.out 2>&1 &
"""
import base64, datetime, json, os, subprocess, urllib.parse, urllib.request
RAG = "https://127.0.0.1:8097"
OLLAMA = "http://127.0.0.1:11434"
NTRS = "https://ntrs.nasa.gov"
COLLECTION = "bp_iace_failure_kb"
WORK = "/tmp/fmea_harvest"
STATE = f"{WORK}/state.json"
RESULTS = f"{WORK}/results.jsonl"
LOG = f"{WORK}/harvest.log"
MAX_DOCS = int(os.environ.get("MAX_DOCS", "400"))
TEXT_MODEL = os.environ.get("OLLAMA_TEXT", "llama3.2:latest")
VISION_MODEL = os.environ.get("OLLAMA_VISION", "qwen2.5vl:32b")
QUERIES = [
"lessons learned failure", "lessons learned anomaly",
"reliability failure mechanism", "component failure investigation",
"fracture leak corrosion fatigue lessons learned",
]
os.makedirs(WORK, exist_ok=True)
def log(m):
line = f"{datetime.datetime.now().isoformat(timespec='seconds')} {m}"
print(line, flush=True)
with open(LOG, "a") as f:
f.write(line + "\n")
def get_json(url, data=None, timeout=90):
hdr = {"User-Agent": "breakpilot-harvest", "Content-Type": "application/json"}
req = urllib.request.Request(url, data=data, headers=hdr)
import ssl
ctx = ssl.create_default_context(); ctx.check_hostname = False; ctx.verify_mode = ssl.CERT_NONE
with urllib.request.urlopen(req, timeout=timeout, context=ctx) as r:
return json.load(r)
def ntrs_usable(l):
if str(l.get("distribution", "")).upper() != "PUBLIC":
return False
ec = l.get("exportControl") or {}
if any(str(ec.get(k, "")).upper() == "YES" for k in ("isExportControl", "ear", "itar")):
return False
if (l.get("cui") or {}).get("isCui"):
return False
cp = l.get("copyright") or {}
if cp.get("containsThirdPartyMaterial"):
return False
return str(cp.get("determinationType", "")).upper() in ("PUBLIC_USE_PERMITTED", "GOV_PUBLIC_USE_PERMITTED")
def pdf_url(l):
for d in l.get("downloads", []):
if "pdf" in str(d.get("mimetype", "")).lower():
orig = (d.get("links") or {}).get("original")
if orig:
return NTRS + orig
return None
def ollama_text(model, prompt, images=None, fmt=None, timeout=180):
body = {"model": model, "prompt": prompt, "stream": False}
if fmt:
body["format"] = fmt
if images:
body["images"] = images
try:
r = get_json(f"{OLLAMA}/api/generate", json.dumps(body).encode(), timeout)
return r.get("response", "")
except Exception as e:
log(f" ollama error: {e}")
return ""
def extract_pdf_text(path):
try:
out = subprocess.run(["pdftotext", "-l", "6", path, "-"], capture_output=True, text=True, timeout=120)
if len(out.stdout.strip()) > 300:
return out.stdout[:6000], "pdftotext"
except Exception as e:
log(f" pdftotext error: {e}")
return "", "none"
def ocr_pdf(path):
base = path + "_pg"
try:
subprocess.run(["pdftoppm", "-png", "-r", "120", "-l", "2", path, base], timeout=180, check=False)
except Exception:
return ""
texts = []
d = os.path.dirname(path)
pngs = sorted(f for f in os.listdir(d) if os.path.basename(base) in f and f.endswith(".png"))[:2]
for p in pngs:
try:
with open(os.path.join(d, p), "rb") as fh:
img = base64.b64encode(fh.read()).decode()
t = ollama_text(VISION_MODEL, "Transcribe all readable text from this page verbatim.", images=[img], timeout=240)
texts.append(t)
except Exception as e:
log(f" ocr page error: {e}")
finally:
try:
os.remove(os.path.join(d, p))
except Exception:
pass
return "\n".join(texts)[:6000]
def extract_tuple(text):
prompt = (
"From the engineering text below, extract ONE failure record as a JSON object with EXACTLY these keys: "
"component (the part that failed), failure_mode (how it failed: fracture/leak/short/wear/stuck/drift/ignition...), "
"mechanism (why: fatigue/contamination/corrosion/overload...), effect (system consequence), "
"hazard (one of: mechanical_hazard, electrical_hazard, thermal_hazard, fire_explosion, pneumatic_hydraulic, none), "
"control (recommended action). Use ONLY information in the text; use \"\" if unknown. Output ONLY the JSON.\n\nTEXT:\n"
+ text[:6000]
)
raw = ollama_text(TEXT_MODEL, prompt, fmt="json", timeout=180)
try:
obj = json.loads(raw)
return obj if isinstance(obj, dict) and obj.get("component") else None
except Exception:
return None
def ingest(doc_id, title, tuple_, license_, url):
md = (f"# NASA Lesson {doc_id}: {title}\n\n"
f"- Source: NASA NTRS {doc_id}\n- License: {license_}\n- URL: {url}\n- verified: false (auto-extracted)\n\n"
f"Component: {tuple_.get('component','')}\nFailure mode: {tuple_.get('failure_mode','')}\n"
f"Mechanism: {tuple_.get('mechanism','')}\nEffect: {tuple_.get('effect','')}\n"
f"Hazard: {tuple_.get('hazard','')}\nControl: {tuple_.get('control','')}\n")
path = f"{WORK}/fk_{doc_id}.md"
with open(path, "w") as f:
f.write(md)
meta = json.dumps({"title": f"NASA NTRS {doc_id}: {title}"[:120], "license": license_,
"source": f"NASA NTRS {doc_id}", "verified": "false"})
cmd = ["curl", "-sk", "--max-time", "90", "-X", "POST", f"{RAG}/api/v1/documents/upload",
"-F", f"file=@{path}", "-F", f"collection={COLLECTION}", "-F", "data_type=failure_kb",
"-F", "use_case=iace_fmea", "-F", "year=2024", "-F", f"metadata_json={meta}"]
r = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
return "chunks_count" in r.stdout
def main():
state = {"done": []}
if os.path.exists(STATE):
try:
state = json.load(open(STATE))
except Exception:
pass
done = set(state["done"])
# ensure collection
subprocess.run(["curl", "-sk", "-X", "POST", f"{RAG}/api/v1/collections", "-H", "Content-Type: application/json",
"-d", json.dumps({"name": COLLECTION, "vector_size": 1024})], capture_output=True)
log(f"START nightly harvest (max {MAX_DOCS}, already done {len(done)})")
processed = 0
for q in QUERIES:
if processed >= MAX_DOCS:
break
try:
res = get_json(f"{NTRS}/api/citations/search?q={urllib.parse.quote(q)}&page.size=100&highlight=false", timeout=90)
except Exception as e:
log(f"NTRS search '{q}' error: {e}")
continue
for l in res.get("results", []):
if processed >= MAX_DOCS:
break
did = l.get("id")
if not did or did in done:
continue
if not ntrs_usable(l):
done.add(did)
continue
done.add(did)
title = (l.get("title") or "")[:120]
lic = f"Public Domain (NASA NTRS, {(l.get('copyright') or {}).get('determinationType','')})"
url = pdf_url(l) or ""
text, how = "", "abstract"
if url:
pdf = f"{WORK}/{did}.pdf"
subprocess.run(["curl", "-sL", "--max-time", "150", "-o", pdf, url], capture_output=True)
if os.path.exists(pdf) and os.path.getsize(pdf) > 1000:
text, how = extract_pdf_text(pdf)
if not text:
text, how = ocr_pdf(pdf), "ocr"
try:
os.remove(pdf)
except Exception:
pass
if not text or len(text.strip()) < 120:
text, how = l.get("abstract") or "", "abstract"
if not text.strip():
continue
tup = extract_tuple(text)
if not tup:
log(f" {did} no tuple ({how})")
continue
tup["_source"] = f"NASA NTRS {did}"; tup["_license"] = lic; tup["_url"] = url
tup["_how"] = how; tup["_verified"] = False; tup["_id"] = did
with open(RESULTS, "a") as f:
f.write(json.dumps(tup, ensure_ascii=False) + "\n")
ok = ingest(did, title, tup, lic, url)
processed += 1
log(f" [{processed}] {did} {tup.get('component','?')}{tup.get('failure_mode','?')} ({how}) ingest={ok}")
json.dump({"done": list(done)}, open(STATE, "w"))
json.dump({"done": list(done)}, open(STATE, "w"))
log(f"DONE nightly harvest: {processed} new tuples this run; {len(done)} ids seen")
if __name__ == "__main__":
main()