c39787ad96
Semantic correction of the knowledge base BEFORE the empirical loop (#59) is built — otherwise the Observation Store would learn from already-misclassified signals. The Silent Pass conflated two kinds of signal into one: an OBSERVATION ("I saw an SBOM in the repo") and a REQUIREMENT ("a tender DEMANDS an SBOM"). They were aliased to the same canonical id, so a tender clause read as "SBOM already present" and suppressed the very question that should have been asked. Fix — make the kind explicit and authoritative (no new architecture, data + thin wiring): - `kind` ∈ {observation, requirement} on ProducedSignal (producer may declare) and on the canonical SignalVocabularyEntry (AUTHORITATIVE — a mislabelled producer cannot collapse the two). - Vocabulary split: sbom_file_found → sbom_present (obs) + sbom_required (req); security_txt_or_cvd_policy → cvd_policy_present (obs) + psirt_required (req); add signed_updates_required. requirement signals are intentionally UNMAPPED in intake_signal_map (they describe a target, not state). - silent_intake() consumes ONLY kind==observation; requirement signals are preserved in `requirements_seen` (visible/auditable) but NEVER become a detected capability. - normalize_signals() stamps the vocabulary's kind onto every IntakeSignal; unknown ids still pass through. This is the same Observation-vs-Requirement split the Requirements Verification Platform rests on: observations are reality, requirements are targets, and their comparison is the delta. A tender / OEM spec / law now produces requirement signals; scanners / repos / documents produce observation signals. Tests: rewrote the two test_signal_producer cases that previously ASSERTED the bug (tender == repo) to pin the correct split; regression — `requires_sbom` yields no capability + stays in requirements_seen while `cyclonedx_found` still detects sbom_creation; endpoint-level regression that a tender requirement does not auto-detect and the gap stays asked; vocabulary-kind-overrides-mislabelled-producer. 25 onboarding tests pass, mypy --strict clean, demo runs, check-loc 0. Runtime effect → deploy + smoke. (Fix A; partial-vs- detected decoupling follows as Fix B before #59.)
85 lines
4.4 KiB
Python
85 lines
4.4 KiB
Python
"""Signal Producer + Normalizer — one signal language, but TWO signal KINDS.
|
|
|
|
Pins the abstraction: every source emits the same ProducedSignal, and the Normalizer reduces
|
|
producer-specific ids to ONE canonical signal via a vocabulary. CRITICAL: an OBSERVATION ("I saw an
|
|
SBOM") and a REQUIREMENT ("a tender DEMANDS an SBOM") must NEVER collapse to the same signal — a
|
|
demanded SBOM is not a present one. kind is authoritative on the canonical vocabulary entry, and the
|
|
Silent Pass consumes only observations.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
|
|
import yaml
|
|
|
|
from compliance.onboarding import (
|
|
ProducedSignal,
|
|
SignalMapping,
|
|
SignalVocabularyEntry,
|
|
normalize_signals,
|
|
silent_intake,
|
|
)
|
|
|
|
_DIR = os.path.dirname(__file__)
|
|
_VOCAB = [SignalVocabularyEntry(**v) for v in yaml.safe_load(
|
|
open(os.path.join(_DIR, "..", "knowledge", "onboarding", "signal_vocabulary.yaml"), encoding="utf-8"))["signals"]]
|
|
_MAP = [SignalMapping(**m) for m in yaml.safe_load(
|
|
open(os.path.join(_DIR, "..", "knowledge", "onboarding", "intake_signal_map.yaml"), encoding="utf-8"))["mappings"]]
|
|
|
|
|
|
def test_observation_producers_yield_one_canonical_signal():
|
|
# the SAME OBSERVATION, emitted by three different producers with different raw ids
|
|
produced = [
|
|
ProducedSignal(signal_id="cyclonedx_found", source_type="repository", provenance="sbom.cdx.json"),
|
|
ProducedSignal(signal_id="spdx_found", source_type="repository", provenance="sbom.spdx"),
|
|
ProducedSignal(signal_id="sbom_uploaded", source_type="document", provenance="customer_upload.pdf"),
|
|
]
|
|
normalized = normalize_signals(produced, _VOCAB)
|
|
assert {s.signal for s in normalized} == {"sbom_present"} # all reduced to ONE canonical observation
|
|
assert {s.kind for s in normalized} == {"observation"} # all observations
|
|
assert {s.source for s in normalized} == {"repository", "document"} # provenance preserved
|
|
|
|
|
|
def test_requirement_and_observation_never_collapse():
|
|
# a tender that DEMANDS an SBOM must NOT become the same signal as a repo that HAS one
|
|
normalized = normalize_signals([
|
|
ProducedSignal(signal_id="cyclonedx_found", source_type="repository"), # observation
|
|
ProducedSignal(signal_id="requires_sbom", source_type="tender", provenance="tender §4.2"), # requirement
|
|
], _VOCAB)
|
|
by_kind = {s.kind: s.signal for s in normalized}
|
|
assert by_kind["observation"] == "sbom_present"
|
|
assert by_kind["requirement"] == "sbom_required"
|
|
assert by_kind["observation"] != by_kind["requirement"]
|
|
|
|
|
|
def test_requirement_signal_produces_no_capability():
|
|
# the regression the whole fix is about: a DEMANDED SBOM yields NO detected capability,
|
|
# but is preserved as a requirement; a real SBOM in the repo still IS detected.
|
|
from_tender = normalize_signals([ProducedSignal(signal_id="requires_sbom", source_type="tender")], _VOCAB)
|
|
res_tender = silent_intake(from_tender, _MAP)
|
|
assert res_tender.capability_ids() == [] # NOT read as present
|
|
assert res_tender.requirements_seen == ["sbom_required"] # but preserved + visible
|
|
|
|
from_repo = normalize_signals([ProducedSignal(signal_id="cyclonedx_found", source_type="repository", evidence="sbom")], _VOCAB)
|
|
assert silent_intake(from_repo, _MAP).capability_ids() == ["sbom_creation"]
|
|
|
|
|
|
def test_vocabulary_kind_overrides_a_mislabelled_producer():
|
|
# even if a producer wrongly tags a requirement as observation, the vocabulary is authoritative
|
|
norm = normalize_signals([ProducedSignal(signal_id="requires_sbom", source_type="tender", kind="observation")], _VOCAB)
|
|
assert norm[0].signal == "sbom_required" and norm[0].kind == "requirement"
|
|
|
|
|
|
def test_unknown_signal_passes_through_not_dropped():
|
|
out = normalize_signals([ProducedSignal(signal_id="brand_new_scanner_signal", source_type="api")], _VOCAB)
|
|
assert out[0].signal == "brand_new_scanner_signal" # visible, not silently lost
|
|
|
|
|
|
def test_confidence_and_provenance_flow_to_detected_capability():
|
|
norm = normalize_signals([ProducedSignal(signal_id="security_txt", source_type="website",
|
|
confidence=0.8, evidence="cvd_policy", provenance="/.well-known/security.txt")], _VOCAB)
|
|
res = silent_intake(norm, _MAP)
|
|
d = next(d for d in res.detected_capabilities if d.capability == "coordinated_vulnerability_disclosure")
|
|
assert d.confidence == 0.8 and d.provenance == "/.well-known/security.txt"
|