#!/usr/bin/env python3 """Quote-verifiable FailureKnowledge extraction via the Anthropic API. Sends each NASA NTRS PDF DIRECTLY to Claude (native PDF + vision, incl. scanned) and forces a structured tool call whose schema REQUIRES a verbatim source quote for the key fields — every value is auditable against the document. Only public-reuse-licensed NTRS docs are processed. applicable=true tuples are ingested into bp_iace_failure_kb; EVERY processed doc is recorded in the source manifest (used vs checked-not-used) for the FMEA frontend register. Env: ANTHROPIC_API_KEY (req), ANTHROPIC_MODEL (default claude-haiku-4-5-20251001), MAX_DOCS (default 100), RAG (default https://127.0.0.1:8097). Out: /tmp/fmea_harvest/anthropic.jsonl, /tmp/fmea_harvest/nasa_failure_sources.json """ import base64, json, os, ssl, subprocess, sys, time, urllib.parse, urllib.request, urllib.error API = "https://api.anthropic.com/v1/messages" NTRS = "https://ntrs.nasa.gov" RAG = os.environ.get("RAG", "https://127.0.0.1:8097") COLLECTION = "bp_iace_failure_kb" MODEL = os.environ.get("ANTHROPIC_MODEL", "claude-haiku-4-5-20251001") KEY = os.environ.get("ANTHROPIC_API_KEY", "") MAX_DOCS = int(os.environ.get("MAX_DOCS", "100")) WORK = "/tmp/fmea_harvest" OUT = f"{WORK}/anthropic.jsonl" MANIFEST = f"{WORK}/nasa_failure_sources.json" QUERIES = ["lessons learned failure", "lessons learned anomaly valve seal bearing motor", "failure investigation fracture leak short corrosion", "reliability failure mechanism component"] os.makedirs(WORK, exist_ok=True) _ctx = ssl.create_default_context() _ctx.check_hostname = False _ctx.verify_mode = ssl.CERT_NONE TOOL = { "name": "record_failure", "description": "Record the hardware failure described in the document, with verbatim source quotes.", "input_schema": {"type": "object", "properties": { "applicable": {"type": "boolean", "description": "true ONLY if the document describes a concrete hardware/component failure"}, "component": {"type": "string"}, "component_quote": {"type": "string", "description": "verbatim sentence naming the component"}, "failure_mode": {"type": "string"}, "failure_mode_quote": {"type": "string", "description": "verbatim sentence describing how it failed"}, "mechanism": {"type": "string"}, "effect": {"type": "string"}, "hazard": {"type": "string", "enum": ["mechanical_hazard", "electrical_hazard", "thermal_hazard", "fire_explosion", "pneumatic_hydraulic", "none"]}, "control": {"type": "string"}, "confidence": {"type": "string", "enum": ["high", "medium", "low"]}}, "required": ["applicable", "component", "failure_mode", "component_quote", "failure_mode_quote", "confidence"]}, } PROMPT = ("Read this NASA engineering document and extract the single primary hardware failure. Use the " "record_failure tool. Quote the SOURCE SENTENCE verbatim for component and failure_mode. If the " "document does not describe a concrete component failure, call the tool with applicable=false. " "Never invent values — leave a field empty rather than guess.") def get_json(url, data=None, headers=None, timeout=120): req = urllib.request.Request(url, data=data, headers=headers or {}) with urllib.request.urlopen(req, timeout=timeout, context=_ctx) as r: return json.load(r) def ntrs_usable(l): if str(l.get("distribution", "")).upper() != "PUBLIC": return False ec = l.get("exportControl") or {} if any(str(ec.get(k, "")).upper() == "YES" for k in ("isExportControl", "ear", "itar")): return False if (l.get("cui") or {}).get("isCui"): return False cp = l.get("copyright") or {} if cp.get("containsThirdPartyMaterial"): return False return str(cp.get("determinationType", "")).upper() in ("PUBLIC_USE_PERMITTED", "GOV_PUBLIC_USE_PERMITTED") def pdf_url(l): for d in l.get("downloads", []): if "pdf" in str(d.get("mimetype", "")).lower(): o = (d.get("links") or {}).get("original") if o: return NTRS + o return None def extract(pdf_b64): body = json.dumps({"model": MODEL, "max_tokens": 1024, "tools": [TOOL], "tool_choice": {"type": "tool", "name": "record_failure"}, "messages": [{"role": "user", "content": [ {"type": "document", "source": {"type": "base64", "media_type": "application/pdf", "data": pdf_b64}}, {"type": "text", "text": PROMPT}]}]}).encode() hdr = {"x-api-key": KEY, "anthropic-version": "2023-06-01", "content-type": "application/json"} for attempt in (1, 2): try: resp = get_json(API, body, hdr, timeout=180) for b in resp.get("content", []): if b.get("type") == "tool_use": return b.get("input"), resp.get("usage", {}) return None, resp.get("usage", {}) except (urllib.error.URLError, ConnectionError) as e: if attempt == 2: raise time.sleep(3) return None, {} def ingest(did, title, t, lic, url): md = (f"# NASA Lesson {did}: {title}\n\n- Source: NASA NTRS {did}\n- License: {lic}\n- URL: {url}\n" f"- verified: false (Claude-extracted, quote-checked)\n\n" f"Component: {t.get('component','')}\nFailure mode: {t.get('failure_mode','')}\n" f"Mechanism: {t.get('mechanism','')}\nEffect: {t.get('effect','')}\nHazard: {t.get('hazard','')}\n" f"Control: {t.get('control','')}\nConfidence: {t.get('confidence','')}\n\n" f"Component quote: {t.get('component_quote','')}\nFailure-mode quote: {t.get('failure_mode_quote','')}\n") p = f"{WORK}/fk_{did}.md" open(p, "w").write(md) meta = json.dumps({"title": f"NASA NTRS {did}: {title}"[:120], "license": lic, "source": f"NASA NTRS {did}", "verified": "false"}) r = subprocess.run(["curl", "-sk", "--max-time", "90", "-X", "POST", f"{RAG}/api/v1/documents/upload", "-F", f"file=@{p}", "-F", f"collection={COLLECTION}", "-F", "data_type=failure_kb", "-F", "use_case=iace_fmea", "-F", "year=2024", "-F", f"metadata_json={meta}"], capture_output=True, text=True, timeout=120) try: os.remove(p) except Exception: pass return "chunks_count" in r.stdout def main(): if not KEY: print("ERROR: ANTHROPIC_API_KEY not set", file=sys.stderr); sys.exit(1) subprocess.run(["curl", "-sk", "-X", "POST", f"{RAG}/api/v1/collections", "-H", "Content-Type: application/json", "-d", json.dumps({"name": COLLECTION, "vector_size": 1024})], capture_output=True) manifest, seen, processed = [], set(), 0 for q in QUERIES: if processed >= MAX_DOCS: break for frm in (0, 100, 200): if processed >= MAX_DOCS: break try: res = get_json(f"{NTRS}/api/citations/search?q={urllib.parse.quote(q)}&page.size=100&page.from={frm}&highlight=false", timeout=90) except Exception as e: print(f"search '{q}'@{frm} error: {e}", flush=True); continue for l in res.get("results", []): if processed >= MAX_DOCS: break did = l.get("id") if not did or did in seen: continue if not ntrs_usable(l): continue url = pdf_url(l) if not url: continue seen.add(did) pdf = f"{WORK}/{did}.pdf" subprocess.run(["curl", "-sL", "--max-time", "150", "-o", pdf, url], capture_output=True) sz = os.path.getsize(pdf) if os.path.exists(pdf) else 0 if sz < 1000 or sz > 30_000_000: try: os.remove(pdf) except Exception: pass continue b64 = base64.b64encode(open(pdf, "rb").read()).decode() os.remove(pdf) try: t, usage = extract(b64) except Exception as e: print(f" {did} extract error: {e}", flush=True); continue if not t: continue title = (l.get("title") or "")[:110] lic = f"Public Domain (NASA NTRS, {(l.get('copyright') or {}).get('determinationType','')})" used = bool(t.get("applicable")) t.update({"_id": did, "_title": title, "_license": lic, "_url": url, "_model": MODEL, "_used": used}) open(OUT, "a").write(json.dumps(t, ensure_ascii=False) + "\n") if used: ingest(did, title, t, lic, url) manifest.append({"id": did, "title": title, "source": f"NASA NTRS {did}", "license": lic, "url": url, "used": used, "component": t.get("component", ""), "failure_mode": t.get("failure_mode", ""), "confidence": t.get("confidence", "")}) processed += 1 json.dump({"generated": "nightly", "model": MODEL, "count": len(manifest), "documents": manifest}, open(MANIFEST, "w"), ensure_ascii=False, indent=1) print(f" [{processed}] {did} used={used} {t.get('component','?')}→{t.get('failure_mode','?')} ({usage.get('input_tokens')}in)", flush=True) used_n = sum(1 for m in manifest if m["used"]) print(f"DONE: {processed} processed, {used_n} used (applicable) -> {MANIFEST}", flush=True) if __name__ == "__main__": main()