Merge pull request 'feat(onboarding): Observation Log — append-only JSONL calibration store (59b/c)' (#51) from feat/observation-log into main
This commit is contained in:
@@ -21,6 +21,14 @@ from .observations import (
|
||||
empirical_distribution,
|
||||
reviewed,
|
||||
)
|
||||
from .observation_log import (
|
||||
HypothesisStats,
|
||||
ObservationRecord,
|
||||
aggregate_by_hypothesis,
|
||||
append_observation,
|
||||
load_observations,
|
||||
review_queue,
|
||||
)
|
||||
from .signals import (
|
||||
ProducedSignal,
|
||||
SignalVocabularyEntry,
|
||||
@@ -69,4 +77,10 @@ __all__ = [
|
||||
"ProducedSignal",
|
||||
"SignalVocabularyEntry",
|
||||
"normalize_signals",
|
||||
"ObservationRecord",
|
||||
"HypothesisStats",
|
||||
"append_observation",
|
||||
"load_observations",
|
||||
"aggregate_by_hypothesis",
|
||||
"review_queue",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,108 @@
|
||||
"""Observation Log — append-only JSONL store for empirical calibration events (Task 59b v1).
|
||||
|
||||
Observations are NOT business data and NOT product-DB data — they are CALIBRATION events for the
|
||||
knowledge base ("ISO27001 -> SDL confirmed", "TISAX -> supplier security refuted"). So they live with the
|
||||
other versioned knowledge artifacts (hypotheses, transition patterns, vocabulary), NOT in the product
|
||||
database: an append-only JSONL log under `knowledge/observations/`. NO migration, NO DB. The empirical
|
||||
DISTRIBUTION and CONFIDENCE are COMPUTED from this log on demand (computed-not-stored) — a hypothesis is
|
||||
NEVER auto-updated; only REVIEWED observations calibrate (the review gate, enforced in observations.py).
|
||||
|
||||
Append-only: each line is one ObservationRecord and lines are NEVER modified in place. A later review is
|
||||
a NEW line with the same observation_id and reviewed=true; load_observations() reconciles to the latest
|
||||
per id. You can `rm` the log and recompute, `git diff` it over months, or rebuild confidence under a new
|
||||
policy. Anonymisation is MANDATORY: customer_archetype is a sector/cert archetype, NEVER a real company
|
||||
name (this file is committed to git). Time is stamped by the CALLER (no hidden clock) for determinism.
|
||||
I/O only at the append/load boundary; statistics are pure. Python 3.9 compatible.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from typing import Dict, List, Optional, Sequence
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from .observations import Observation, empirical_confidence, empirical_distribution
|
||||
|
||||
_DEFAULT_LOG = os.path.join(
|
||||
os.path.dirname(__file__), "..", "..", "knowledge", "observations", "observations.jsonl")
|
||||
|
||||
|
||||
class ObservationRecord(Observation):
|
||||
"""A persisted observation line: an Observation (with its review gate + observation_type) plus log
|
||||
metadata. `observation_id` is stable — a review re-appends the SAME id with reviewed=true."""
|
||||
|
||||
observation_id: str # stable id; a review re-appends the same id
|
||||
timestamp: str = "" # ISO 8601, stamped by the CALLER (no hidden clock)
|
||||
customer_archetype: str = "" # sector/cert archetype — NEVER a real company name
|
||||
evidence: str = "" # what backs the answer (reference, not the artifact)
|
||||
provenance: str = "" # where the answer came from (audit trail)
|
||||
knowledge_version: str = "" # hypotheses/vocabulary version observed under
|
||||
|
||||
|
||||
class HypothesisStats(BaseModel):
|
||||
"""Per-hypothesis empirical rollup — all COMPUTED from the log, nothing stored on the hypothesis."""
|
||||
|
||||
hypothesis_id: str
|
||||
distribution: Dict[str, int] = Field(default_factory=dict) # reviewed counts per observation_type
|
||||
confidence: Optional[float] = None # None until a for/against obs is reviewed
|
||||
reviewed_count: int = 0
|
||||
total_count: int = 0
|
||||
|
||||
|
||||
def append_observation(record: ObservationRecord, path: str = _DEFAULT_LOG) -> None:
|
||||
"""Append ONE record as a JSON line. Append-only — existing lines are never rewritten."""
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
line = json.dumps(record.model_dump(mode="json"), ensure_ascii=False, sort_keys=True)
|
||||
with open(path, "a", encoding="utf-8") as fh:
|
||||
fh.write(line + "\n")
|
||||
|
||||
|
||||
def load_observations(path: str = _DEFAULT_LOG, reconcile: bool = True) -> List[ObservationRecord]:
|
||||
"""Read all records — a single `.jsonl` file or a directory of monthly `.jsonl` files. With
|
||||
reconcile, the LATEST record per observation_id wins (a later reviewed=true supersedes the original).
|
||||
Returns deterministic order (by observation_id when reconciled, else append order)."""
|
||||
files: List[str] = []
|
||||
if os.path.isdir(path):
|
||||
files = sorted(os.path.join(path, f) for f in os.listdir(path) if f.endswith(".jsonl"))
|
||||
elif os.path.exists(path):
|
||||
files = [path]
|
||||
records: List[ObservationRecord] = []
|
||||
for fpath in files:
|
||||
with open(fpath, encoding="utf-8") as fh:
|
||||
for raw in fh:
|
||||
raw = raw.strip()
|
||||
if raw:
|
||||
records.append(ObservationRecord(**json.loads(raw)))
|
||||
if not reconcile:
|
||||
return records
|
||||
latest: Dict[str, ObservationRecord] = {}
|
||||
for r in records: # file/append order -> later lines win
|
||||
latest[r.observation_id] = r
|
||||
return [latest[k] for k in sorted(latest)]
|
||||
|
||||
|
||||
def aggregate_by_hypothesis(records: Sequence[ObservationRecord]) -> List[HypothesisStats]:
|
||||
"""Per-hypothesis distribution + confidence. The review gate applies inside empirical_distribution/
|
||||
empirical_confidence (reviewed-only), so unreviewed observations are counted in total but never
|
||||
calibrate. Deterministic order (by hypothesis id)."""
|
||||
by_hyp: Dict[str, List[ObservationRecord]] = {}
|
||||
for r in records:
|
||||
by_hyp.setdefault(r.hypothesis_id, []).append(r)
|
||||
out: List[HypothesisStats] = []
|
||||
for hyp in sorted(by_hyp):
|
||||
obs = by_hyp[hyp]
|
||||
out.append(HypothesisStats(
|
||||
hypothesis_id=hyp,
|
||||
distribution=empirical_distribution(obs), # reviewed-only (the gate)
|
||||
confidence=empirical_confidence(obs), # None until reviewed for/against
|
||||
reviewed_count=sum(1 for o in obs if o.reviewed),
|
||||
total_count=len(obs)))
|
||||
return out
|
||||
|
||||
|
||||
def review_queue(records: Sequence[ObservationRecord]) -> List[ObservationRecord]:
|
||||
"""The reviewer's worklist: observations not yet reviewed. Calibration ignores these until a reviewer
|
||||
accepts them (Observation -> Review -> Accepted -> Knowledge recomputed), never Observation -> conf++."""
|
||||
return [r for r in records if not r.reviewed]
|
||||
@@ -0,0 +1,2 @@
|
||||
# Append-only observation log (Task 59b). Real lines (observations.jsonl / YYYY-MM.jsonl) are written at
|
||||
# runtime via compliance/onboarding/observation_log.py. Anonymised archetypes only — NEVER real company names.
|
||||
@@ -0,0 +1,73 @@
|
||||
"""Observation Log — append-only JSONL store + computed statistics (Task 59b/c v1).
|
||||
|
||||
Pins the user's decision (2026-06-28): observations are CALIBRATION data, not product data -> an
|
||||
append-only JSONL log under knowledge/observations/, NO DB, NO migration. Distribution and confidence are
|
||||
COMPUTED from the log; only REVIEWED observations calibrate (review gate); a later review is a new line
|
||||
that supersedes by observation_id. Nothing is ever written back to a hypothesis.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from compliance.onboarding import (
|
||||
ObservationRecord,
|
||||
ObservationType,
|
||||
aggregate_by_hypothesis,
|
||||
append_observation,
|
||||
load_observations,
|
||||
review_queue,
|
||||
)
|
||||
|
||||
|
||||
def _rec(oid, hyp, otype, reviewed=False, **kw):
|
||||
return ObservationRecord(
|
||||
observation_id=oid, hypothesis_id=hyp, observation_type=otype, reviewed=reviewed,
|
||||
timestamp="2026-07-01T00:00:00Z", customer_archetype="machine_builder+ISO27001", **kw)
|
||||
|
||||
|
||||
def test_append_only_round_trip(tmp_path):
|
||||
p = str(tmp_path / "obs.jsonl")
|
||||
append_observation(_rec("o1", "HYP-secure_dev", ObservationType.CONFIRMED, reviewed=True), p)
|
||||
append_observation(_rec("o2", "HYP-secure_dev", ObservationType.REFUTED, reviewed=True), p)
|
||||
recs = load_observations(p)
|
||||
assert {r.observation_id for r in recs} == {"o1", "o2"}
|
||||
assert all(r.customer_archetype == "machine_builder+ISO27001" for r in recs) # anonymised archetype, not a name
|
||||
|
||||
|
||||
def test_review_supersedes_by_id_append_only(tmp_path):
|
||||
p = str(tmp_path / "obs.jsonl")
|
||||
append_observation(_rec("o1", "HYP-x", ObservationType.CONFIRMED, reviewed=False), p) # raw answer
|
||||
append_observation(_rec("o1", "HYP-x", ObservationType.CONFIRMED, reviewed=True,
|
||||
reviewed_by="anna"), p) # later review event
|
||||
assert len(load_observations(p, reconcile=False)) == 2 # both lines kept (append-only)
|
||||
recs = load_observations(p) # reconciled
|
||||
assert len(recs) == 1 and recs[0].reviewed and recs[0].reviewed_by == "anna"
|
||||
|
||||
|
||||
def test_statistics_apply_the_review_gate(tmp_path):
|
||||
p = str(tmp_path / "obs.jsonl")
|
||||
append_observation(_rec("a", "HYP-sdl", ObservationType.CONFIRMED, reviewed=True), p)
|
||||
append_observation(_rec("b", "HYP-sdl", ObservationType.CONFIRMED, reviewed=True), p)
|
||||
append_observation(_rec("c", "HYP-sdl", ObservationType.REFUTED, reviewed=True), p)
|
||||
append_observation(_rec("d", "HYP-sdl", ObservationType.CONFIRMED, reviewed=False), p) # unreviewed -> ignored
|
||||
stats = {s.hypothesis_id: s for s in aggregate_by_hypothesis(load_observations(p))}
|
||||
s = stats["HYP-sdl"]
|
||||
assert s.total_count == 4 and s.reviewed_count == 3
|
||||
assert s.distribution["confirmed"] == 2 and s.distribution["refuted"] == 1 # unreviewed one excluded
|
||||
assert s.confidence == round(2 / 3, 2) # (2 + 0.5*0) / 3
|
||||
|
||||
|
||||
def test_review_queue_lists_unreviewed(tmp_path):
|
||||
p = str(tmp_path / "obs.jsonl")
|
||||
append_observation(_rec("a", "HYP-y", ObservationType.CONFIRMED, reviewed=True), p)
|
||||
append_observation(_rec("b", "HYP-y", ObservationType.PARTIAL, reviewed=False), p)
|
||||
q = review_queue(load_observations(p))
|
||||
assert [r.observation_id for r in q] == ["b"]
|
||||
|
||||
|
||||
def test_load_directory_of_monthly_files(tmp_path):
|
||||
d = tmp_path / "observations"
|
||||
d.mkdir()
|
||||
append_observation(_rec("a", "HYP-z", ObservationType.CONFIRMED, reviewed=True), str(d / "2026-06.jsonl"))
|
||||
append_observation(_rec("b", "HYP-z", ObservationType.REFUTED, reviewed=True), str(d / "2026-07.jsonl"))
|
||||
recs = load_observations(str(d))
|
||||
assert {r.observation_id for r in recs} == {"a", "b"}
|
||||
Reference in New Issue
Block a user