#!/usr/bin/env python3 """Unattended overnight harvester: NASA NTRS lessons-learned -> FailureKnowledge. Bounded, resumable, logged. Pure-public-reuse licence gate (mirrors Go NTRSUsable). Extraction priority per doc: pdftotext -> abstract (always available) -> vision OCR (scanned only). Each tuple is LLM-extracted (Ollama), tagged verified=false + provenance, written to results.jsonl AND ingested into bp_iace_failure_kb for morning review/promotion. It never touches the curated Go set. Env: MAX_DOCS (default 400), OLLAMA_TEXT (llama3.2:latest), OLLAMA_VISION (qwen2.5vl:32b). Run: nohup python3 fmea_nightly_harvest.py > /tmp/fmea_harvest/run.out 2>&1 & """ import base64, datetime, json, os, subprocess, urllib.parse, urllib.request RAG = "https://127.0.0.1:8097" OLLAMA = "http://127.0.0.1:11434" NTRS = "https://ntrs.nasa.gov" COLLECTION = "bp_iace_failure_kb" WORK = "/tmp/fmea_harvest" STATE = f"{WORK}/state.json" RESULTS = f"{WORK}/results.jsonl" LOG = f"{WORK}/harvest.log" MAX_DOCS = int(os.environ.get("MAX_DOCS", "400")) TEXT_MODEL = os.environ.get("OLLAMA_TEXT", "llama3.2:latest") VISION_MODEL = os.environ.get("OLLAMA_VISION", "qwen2.5vl:32b") QUERIES = [ "lessons learned failure", "lessons learned anomaly", "reliability failure mechanism", "component failure investigation", "fracture leak corrosion fatigue lessons learned", ] os.makedirs(WORK, exist_ok=True) def log(m): line = f"{datetime.datetime.now().isoformat(timespec='seconds')} {m}" print(line, flush=True) with open(LOG, "a") as f: f.write(line + "\n") def get_json(url, data=None, timeout=90): hdr = {"User-Agent": "breakpilot-harvest", "Content-Type": "application/json"} req = urllib.request.Request(url, data=data, headers=hdr) import ssl ctx = ssl.create_default_context(); ctx.check_hostname = False; ctx.verify_mode = ssl.CERT_NONE with urllib.request.urlopen(req, timeout=timeout, context=ctx) as r: return json.load(r) def ntrs_usable(l): if str(l.get("distribution", "")).upper() != "PUBLIC": return False ec = l.get("exportControl") or {} if any(str(ec.get(k, "")).upper() == "YES" for k in ("isExportControl", "ear", "itar")): return False if (l.get("cui") or {}).get("isCui"): return False cp = l.get("copyright") or {} if cp.get("containsThirdPartyMaterial"): return False return str(cp.get("determinationType", "")).upper() in ("PUBLIC_USE_PERMITTED", "GOV_PUBLIC_USE_PERMITTED") def pdf_url(l): for d in l.get("downloads", []): if "pdf" in str(d.get("mimetype", "")).lower(): orig = (d.get("links") or {}).get("original") if orig: return NTRS + orig return None def ollama_text(model, prompt, images=None, fmt=None, timeout=180): body = {"model": model, "prompt": prompt, "stream": False} if fmt: body["format"] = fmt if images: body["images"] = images try: r = get_json(f"{OLLAMA}/api/generate", json.dumps(body).encode(), timeout) return r.get("response", "") except Exception as e: log(f" ollama error: {e}") return "" def extract_pdf_text(path): try: out = subprocess.run(["pdftotext", "-l", "6", path, "-"], capture_output=True, text=True, timeout=120) if len(out.stdout.strip()) > 300: return out.stdout[:6000], "pdftotext" except Exception as e: log(f" pdftotext error: {e}") return "", "none" def ocr_pdf(path): base = path + "_pg" try: subprocess.run(["pdftoppm", "-png", "-r", "120", "-l", "2", path, base], timeout=180, check=False) except Exception: return "" texts = [] d = os.path.dirname(path) pngs = sorted(f for f in os.listdir(d) if os.path.basename(base) in f and f.endswith(".png"))[:2] for p in pngs: try: with open(os.path.join(d, p), "rb") as fh: img = base64.b64encode(fh.read()).decode() t = ollama_text(VISION_MODEL, "Transcribe all readable text from this page verbatim.", images=[img], timeout=240) texts.append(t) except Exception as e: log(f" ocr page error: {e}") finally: try: os.remove(os.path.join(d, p)) except Exception: pass return "\n".join(texts)[:6000] def extract_tuple(text): prompt = ( "From the engineering text below, extract ONE failure record as a JSON object with EXACTLY these keys: " "component (the part that failed), failure_mode (how it failed: fracture/leak/short/wear/stuck/drift/ignition...), " "mechanism (why: fatigue/contamination/corrosion/overload...), effect (system consequence), " "hazard (one of: mechanical_hazard, electrical_hazard, thermal_hazard, fire_explosion, pneumatic_hydraulic, none), " "control (recommended action). Use ONLY information in the text; use \"\" if unknown. Output ONLY the JSON.\n\nTEXT:\n" + text[:6000] ) raw = ollama_text(TEXT_MODEL, prompt, fmt="json", timeout=180) try: obj = json.loads(raw) return obj if isinstance(obj, dict) and obj.get("component") else None except Exception: return None def ingest(doc_id, title, tuple_, license_, url): md = (f"# NASA Lesson {doc_id}: {title}\n\n" f"- Source: NASA NTRS {doc_id}\n- License: {license_}\n- URL: {url}\n- verified: false (auto-extracted)\n\n" f"Component: {tuple_.get('component','')}\nFailure mode: {tuple_.get('failure_mode','')}\n" f"Mechanism: {tuple_.get('mechanism','')}\nEffect: {tuple_.get('effect','')}\n" f"Hazard: {tuple_.get('hazard','')}\nControl: {tuple_.get('control','')}\n") path = f"{WORK}/fk_{doc_id}.md" with open(path, "w") as f: f.write(md) meta = json.dumps({"title": f"NASA NTRS {doc_id}: {title}"[:120], "license": license_, "source": f"NASA NTRS {doc_id}", "verified": "false"}) cmd = ["curl", "-sk", "--max-time", "90", "-X", "POST", f"{RAG}/api/v1/documents/upload", "-F", f"file=@{path}", "-F", f"collection={COLLECTION}", "-F", "data_type=failure_kb", "-F", "use_case=iace_fmea", "-F", "year=2024", "-F", f"metadata_json={meta}"] r = subprocess.run(cmd, capture_output=True, text=True, timeout=120) return "chunks_count" in r.stdout def main(): state = {"done": []} if os.path.exists(STATE): try: state = json.load(open(STATE)) except Exception: pass done = set(state["done"]) # ensure collection subprocess.run(["curl", "-sk", "-X", "POST", f"{RAG}/api/v1/collections", "-H", "Content-Type: application/json", "-d", json.dumps({"name": COLLECTION, "vector_size": 1024})], capture_output=True) log(f"START nightly harvest (max {MAX_DOCS}, already done {len(done)})") processed = 0 for q in QUERIES: if processed >= MAX_DOCS: break try: res = get_json(f"{NTRS}/api/citations/search?q={urllib.parse.quote(q)}&page.size=100&highlight=false", timeout=90) except Exception as e: log(f"NTRS search '{q}' error: {e}") continue for l in res.get("results", []): if processed >= MAX_DOCS: break did = l.get("id") if not did or did in done: continue if not ntrs_usable(l): done.add(did) continue done.add(did) title = (l.get("title") or "")[:120] lic = f"Public Domain (NASA NTRS, {(l.get('copyright') or {}).get('determinationType','')})" url = pdf_url(l) or "" text, how = "", "abstract" if url: pdf = f"{WORK}/{did}.pdf" subprocess.run(["curl", "-sL", "--max-time", "150", "-o", pdf, url], capture_output=True) if os.path.exists(pdf) and os.path.getsize(pdf) > 1000: text, how = extract_pdf_text(pdf) if not text: text, how = ocr_pdf(pdf), "ocr" try: os.remove(pdf) except Exception: pass if not text or len(text.strip()) < 120: text, how = l.get("abstract") or "", "abstract" if not text.strip(): continue tup = extract_tuple(text) if not tup: log(f" {did} no tuple ({how})") continue tup["_source"] = f"NASA NTRS {did}"; tup["_license"] = lic; tup["_url"] = url tup["_how"] = how; tup["_verified"] = False; tup["_id"] = did with open(RESULTS, "a") as f: f.write(json.dumps(tup, ensure_ascii=False) + "\n") ok = ingest(did, title, tup, lic, url) processed += 1 log(f" [{processed}] {did} {tup.get('component','?')}→{tup.get('failure_mode','?')} ({how}) ingest={ok}") json.dump({"done": list(done)}, open(STATE, "w")) json.dump({"done": list(done)}, open(STATE, "w")) log(f"DONE nightly harvest: {processed} new tuples this run; {len(done)} ids seen") if __name__ == "__main__": main()