diff --git a/backend-compliance/compliance/onboarding/__init__.py b/backend-compliance/compliance/onboarding/__init__.py index 878dfdff..be29e32c 100644 --- a/backend-compliance/compliance/onboarding/__init__.py +++ b/backend-compliance/compliance/onboarding/__init__.py @@ -21,6 +21,14 @@ from .observations import ( empirical_distribution, reviewed, ) +from .observation_log import ( + HypothesisStats, + ObservationRecord, + aggregate_by_hypothesis, + append_observation, + load_observations, + review_queue, +) from .signals import ( ProducedSignal, SignalVocabularyEntry, @@ -69,4 +77,10 @@ __all__ = [ "ProducedSignal", "SignalVocabularyEntry", "normalize_signals", + "ObservationRecord", + "HypothesisStats", + "append_observation", + "load_observations", + "aggregate_by_hypothesis", + "review_queue", ] diff --git a/backend-compliance/compliance/onboarding/observation_log.py b/backend-compliance/compliance/onboarding/observation_log.py new file mode 100644 index 00000000..06c7f45d --- /dev/null +++ b/backend-compliance/compliance/onboarding/observation_log.py @@ -0,0 +1,108 @@ +"""Observation Log — append-only JSONL store for empirical calibration events (Task 59b v1). + +Observations are NOT business data and NOT product-DB data — they are CALIBRATION events for the +knowledge base ("ISO27001 -> SDL confirmed", "TISAX -> supplier security refuted"). So they live with the +other versioned knowledge artifacts (hypotheses, transition patterns, vocabulary), NOT in the product +database: an append-only JSONL log under `knowledge/observations/`. NO migration, NO DB. The empirical +DISTRIBUTION and CONFIDENCE are COMPUTED from this log on demand (computed-not-stored) — a hypothesis is +NEVER auto-updated; only REVIEWED observations calibrate (the review gate, enforced in observations.py). + +Append-only: each line is one ObservationRecord and lines are NEVER modified in place. A later review is +a NEW line with the same observation_id and reviewed=true; load_observations() reconciles to the latest +per id. You can `rm` the log and recompute, `git diff` it over months, or rebuild confidence under a new +policy. Anonymisation is MANDATORY: customer_archetype is a sector/cert archetype, NEVER a real company +name (this file is committed to git). Time is stamped by the CALLER (no hidden clock) for determinism. +I/O only at the append/load boundary; statistics are pure. Python 3.9 compatible. +""" + +from __future__ import annotations + +import json +import os +from typing import Dict, List, Optional, Sequence + +from pydantic import BaseModel, Field + +from .observations import Observation, empirical_confidence, empirical_distribution + +_DEFAULT_LOG = os.path.join( + os.path.dirname(__file__), "..", "..", "knowledge", "observations", "observations.jsonl") + + +class ObservationRecord(Observation): + """A persisted observation line: an Observation (with its review gate + observation_type) plus log + metadata. `observation_id` is stable — a review re-appends the SAME id with reviewed=true.""" + + observation_id: str # stable id; a review re-appends the same id + timestamp: str = "" # ISO 8601, stamped by the CALLER (no hidden clock) + customer_archetype: str = "" # sector/cert archetype — NEVER a real company name + evidence: str = "" # what backs the answer (reference, not the artifact) + provenance: str = "" # where the answer came from (audit trail) + knowledge_version: str = "" # hypotheses/vocabulary version observed under + + +class HypothesisStats(BaseModel): + """Per-hypothesis empirical rollup — all COMPUTED from the log, nothing stored on the hypothesis.""" + + hypothesis_id: str + distribution: Dict[str, int] = Field(default_factory=dict) # reviewed counts per observation_type + confidence: Optional[float] = None # None until a for/against obs is reviewed + reviewed_count: int = 0 + total_count: int = 0 + + +def append_observation(record: ObservationRecord, path: str = _DEFAULT_LOG) -> None: + """Append ONE record as a JSON line. Append-only — existing lines are never rewritten.""" + os.makedirs(os.path.dirname(path), exist_ok=True) + line = json.dumps(record.model_dump(mode="json"), ensure_ascii=False, sort_keys=True) + with open(path, "a", encoding="utf-8") as fh: + fh.write(line + "\n") + + +def load_observations(path: str = _DEFAULT_LOG, reconcile: bool = True) -> List[ObservationRecord]: + """Read all records — a single `.jsonl` file or a directory of monthly `.jsonl` files. With + reconcile, the LATEST record per observation_id wins (a later reviewed=true supersedes the original). + Returns deterministic order (by observation_id when reconciled, else append order).""" + files: List[str] = [] + if os.path.isdir(path): + files = sorted(os.path.join(path, f) for f in os.listdir(path) if f.endswith(".jsonl")) + elif os.path.exists(path): + files = [path] + records: List[ObservationRecord] = [] + for fpath in files: + with open(fpath, encoding="utf-8") as fh: + for raw in fh: + raw = raw.strip() + if raw: + records.append(ObservationRecord(**json.loads(raw))) + if not reconcile: + return records + latest: Dict[str, ObservationRecord] = {} + for r in records: # file/append order -> later lines win + latest[r.observation_id] = r + return [latest[k] for k in sorted(latest)] + + +def aggregate_by_hypothesis(records: Sequence[ObservationRecord]) -> List[HypothesisStats]: + """Per-hypothesis distribution + confidence. The review gate applies inside empirical_distribution/ + empirical_confidence (reviewed-only), so unreviewed observations are counted in total but never + calibrate. Deterministic order (by hypothesis id).""" + by_hyp: Dict[str, List[ObservationRecord]] = {} + for r in records: + by_hyp.setdefault(r.hypothesis_id, []).append(r) + out: List[HypothesisStats] = [] + for hyp in sorted(by_hyp): + obs = by_hyp[hyp] + out.append(HypothesisStats( + hypothesis_id=hyp, + distribution=empirical_distribution(obs), # reviewed-only (the gate) + confidence=empirical_confidence(obs), # None until reviewed for/against + reviewed_count=sum(1 for o in obs if o.reviewed), + total_count=len(obs))) + return out + + +def review_queue(records: Sequence[ObservationRecord]) -> List[ObservationRecord]: + """The reviewer's worklist: observations not yet reviewed. Calibration ignores these until a reviewer + accepts them (Observation -> Review -> Accepted -> Knowledge recomputed), never Observation -> conf++.""" + return [r for r in records if not r.reviewed] diff --git a/backend-compliance/knowledge/observations/.gitkeep b/backend-compliance/knowledge/observations/.gitkeep new file mode 100644 index 00000000..66bbe252 --- /dev/null +++ b/backend-compliance/knowledge/observations/.gitkeep @@ -0,0 +1,2 @@ +# Append-only observation log (Task 59b). Real lines (observations.jsonl / YYYY-MM.jsonl) are written at +# runtime via compliance/onboarding/observation_log.py. Anonymised archetypes only — NEVER real company names. diff --git a/backend-compliance/tests/test_observation_log.py b/backend-compliance/tests/test_observation_log.py new file mode 100644 index 00000000..4dc3e71b --- /dev/null +++ b/backend-compliance/tests/test_observation_log.py @@ -0,0 +1,73 @@ +"""Observation Log — append-only JSONL store + computed statistics (Task 59b/c v1). + +Pins the user's decision (2026-06-28): observations are CALIBRATION data, not product data -> an +append-only JSONL log under knowledge/observations/, NO DB, NO migration. Distribution and confidence are +COMPUTED from the log; only REVIEWED observations calibrate (review gate); a later review is a new line +that supersedes by observation_id. Nothing is ever written back to a hypothesis. +""" + +from __future__ import annotations + +from compliance.onboarding import ( + ObservationRecord, + ObservationType, + aggregate_by_hypothesis, + append_observation, + load_observations, + review_queue, +) + + +def _rec(oid, hyp, otype, reviewed=False, **kw): + return ObservationRecord( + observation_id=oid, hypothesis_id=hyp, observation_type=otype, reviewed=reviewed, + timestamp="2026-07-01T00:00:00Z", customer_archetype="machine_builder+ISO27001", **kw) + + +def test_append_only_round_trip(tmp_path): + p = str(tmp_path / "obs.jsonl") + append_observation(_rec("o1", "HYP-secure_dev", ObservationType.CONFIRMED, reviewed=True), p) + append_observation(_rec("o2", "HYP-secure_dev", ObservationType.REFUTED, reviewed=True), p) + recs = load_observations(p) + assert {r.observation_id for r in recs} == {"o1", "o2"} + assert all(r.customer_archetype == "machine_builder+ISO27001" for r in recs) # anonymised archetype, not a name + + +def test_review_supersedes_by_id_append_only(tmp_path): + p = str(tmp_path / "obs.jsonl") + append_observation(_rec("o1", "HYP-x", ObservationType.CONFIRMED, reviewed=False), p) # raw answer + append_observation(_rec("o1", "HYP-x", ObservationType.CONFIRMED, reviewed=True, + reviewed_by="anna"), p) # later review event + assert len(load_observations(p, reconcile=False)) == 2 # both lines kept (append-only) + recs = load_observations(p) # reconciled + assert len(recs) == 1 and recs[0].reviewed and recs[0].reviewed_by == "anna" + + +def test_statistics_apply_the_review_gate(tmp_path): + p = str(tmp_path / "obs.jsonl") + append_observation(_rec("a", "HYP-sdl", ObservationType.CONFIRMED, reviewed=True), p) + append_observation(_rec("b", "HYP-sdl", ObservationType.CONFIRMED, reviewed=True), p) + append_observation(_rec("c", "HYP-sdl", ObservationType.REFUTED, reviewed=True), p) + append_observation(_rec("d", "HYP-sdl", ObservationType.CONFIRMED, reviewed=False), p) # unreviewed -> ignored + stats = {s.hypothesis_id: s for s in aggregate_by_hypothesis(load_observations(p))} + s = stats["HYP-sdl"] + assert s.total_count == 4 and s.reviewed_count == 3 + assert s.distribution["confirmed"] == 2 and s.distribution["refuted"] == 1 # unreviewed one excluded + assert s.confidence == round(2 / 3, 2) # (2 + 0.5*0) / 3 + + +def test_review_queue_lists_unreviewed(tmp_path): + p = str(tmp_path / "obs.jsonl") + append_observation(_rec("a", "HYP-y", ObservationType.CONFIRMED, reviewed=True), p) + append_observation(_rec("b", "HYP-y", ObservationType.PARTIAL, reviewed=False), p) + q = review_queue(load_observations(p)) + assert [r.observation_id for r in q] == ["b"] + + +def test_load_directory_of_monthly_files(tmp_path): + d = tmp_path / "observations" + d.mkdir() + append_observation(_rec("a", "HYP-z", ObservationType.CONFIRMED, reviewed=True), str(d / "2026-06.jsonl")) + append_observation(_rec("b", "HYP-z", ObservationType.REFUTED, reviewed=True), str(d / "2026-07.jsonl")) + recs = load_observations(str(d)) + assert {r.observation_id for r in recs} == {"a", "b"}