"""Observation Log — append-only JSONL store for empirical calibration events (Task 59b v1). Observations are NOT business data and NOT product-DB data — they are CALIBRATION events for the knowledge base ("ISO27001 -> SDL confirmed", "TISAX -> supplier security refuted"). So they live with the other versioned knowledge artifacts (hypotheses, transition patterns, vocabulary), NOT in the product database: an append-only JSONL log under `knowledge/observations/`. NO migration, NO DB. The empirical DISTRIBUTION and CONFIDENCE are COMPUTED from this log on demand (computed-not-stored) — a hypothesis is NEVER auto-updated; only REVIEWED observations calibrate (the review gate, enforced in observations.py). Append-only: each line is one ObservationRecord and lines are NEVER modified in place. A later review is a NEW line with the same observation_id and reviewed=true; load_observations() reconciles to the latest per id. You can `rm` the log and recompute, `git diff` it over months, or rebuild confidence under a new policy. Anonymisation is MANDATORY: customer_archetype is a sector/cert archetype, NEVER a real company name (this file is committed to git). Time is stamped by the CALLER (no hidden clock) for determinism. I/O only at the append/load boundary; statistics are pure. Python 3.9 compatible. """ from __future__ import annotations import json import os from typing import Dict, List, Optional, Sequence from pydantic import BaseModel, Field from .observations import Observation, empirical_confidence, empirical_distribution _DEFAULT_LOG = os.path.join( os.path.dirname(__file__), "..", "..", "knowledge", "observations", "observations.jsonl") class ObservationRecord(Observation): """A persisted observation line: an Observation (with its review gate + observation_type) plus log metadata. `observation_id` is stable — a review re-appends the SAME id with reviewed=true.""" observation_id: str # stable id; a review re-appends the same id timestamp: str = "" # ISO 8601, stamped by the CALLER (no hidden clock) customer_archetype: str = "" # sector/cert archetype — NEVER a real company name evidence: str = "" # what backs the answer (reference, not the artifact) provenance: str = "" # where the answer came from (audit trail) knowledge_version: str = "" # hypotheses/vocabulary version observed under class HypothesisStats(BaseModel): """Per-hypothesis empirical rollup — all COMPUTED from the log, nothing stored on the hypothesis.""" hypothesis_id: str distribution: Dict[str, int] = Field(default_factory=dict) # reviewed counts per observation_type confidence: Optional[float] = None # None until a for/against obs is reviewed reviewed_count: int = 0 total_count: int = 0 def append_observation(record: ObservationRecord, path: str = _DEFAULT_LOG) -> None: """Append ONE record as a JSON line. Append-only — existing lines are never rewritten.""" os.makedirs(os.path.dirname(path), exist_ok=True) line = json.dumps(record.model_dump(mode="json"), ensure_ascii=False, sort_keys=True) with open(path, "a", encoding="utf-8") as fh: fh.write(line + "\n") def load_observations(path: str = _DEFAULT_LOG, reconcile: bool = True) -> List[ObservationRecord]: """Read all records — a single `.jsonl` file or a directory of monthly `.jsonl` files. With reconcile, the LATEST record per observation_id wins (a later reviewed=true supersedes the original). Returns deterministic order (by observation_id when reconciled, else append order).""" files: List[str] = [] if os.path.isdir(path): files = sorted(os.path.join(path, f) for f in os.listdir(path) if f.endswith(".jsonl")) elif os.path.exists(path): files = [path] records: List[ObservationRecord] = [] for fpath in files: with open(fpath, encoding="utf-8") as fh: for raw in fh: raw = raw.strip() if raw: records.append(ObservationRecord(**json.loads(raw))) if not reconcile: return records latest: Dict[str, ObservationRecord] = {} for r in records: # file/append order -> later lines win latest[r.observation_id] = r return [latest[k] for k in sorted(latest)] def aggregate_by_hypothesis(records: Sequence[ObservationRecord]) -> List[HypothesisStats]: """Per-hypothesis distribution + confidence. The review gate applies inside empirical_distribution/ empirical_confidence (reviewed-only), so unreviewed observations are counted in total but never calibrate. Deterministic order (by hypothesis id).""" by_hyp: Dict[str, List[ObservationRecord]] = {} for r in records: by_hyp.setdefault(r.hypothesis_id, []).append(r) out: List[HypothesisStats] = [] for hyp in sorted(by_hyp): obs = by_hyp[hyp] out.append(HypothesisStats( hypothesis_id=hyp, distribution=empirical_distribution(obs), # reviewed-only (the gate) confidence=empirical_confidence(obs), # None until reviewed for/against reviewed_count=sum(1 for o in obs if o.reviewed), total_count=len(obs))) return out def review_queue(records: Sequence[ObservationRecord]) -> List[ObservationRecord]: """The reviewer's worklist: observations not yet reviewed. Calibration ignores these until a reviewer accepts them (Observation -> Review -> Accepted -> Knowledge recomputed), never Observation -> conf++.""" return [r for r in records if not r.reviewed]