"""Signal Producer + Normalizer — one signal language, but TWO signal KINDS. Pins the abstraction: every source emits the same ProducedSignal, and the Normalizer reduces producer-specific ids to ONE canonical signal via a vocabulary. CRITICAL: an OBSERVATION ("I saw an SBOM") and a REQUIREMENT ("a tender DEMANDS an SBOM") must NEVER collapse to the same signal — a demanded SBOM is not a present one. kind is authoritative on the canonical vocabulary entry, and the Silent Pass consumes only observations. """ from __future__ import annotations import os import yaml from compliance.onboarding import ( ProducedSignal, SignalMapping, SignalVocabularyEntry, normalize_signals, silent_intake, ) _DIR = os.path.dirname(__file__) _VOCAB = [SignalVocabularyEntry(**v) for v in yaml.safe_load( open(os.path.join(_DIR, "..", "knowledge", "onboarding", "signal_vocabulary.yaml"), encoding="utf-8"))["signals"]] _MAP = [SignalMapping(**m) for m in yaml.safe_load( open(os.path.join(_DIR, "..", "knowledge", "onboarding", "intake_signal_map.yaml"), encoding="utf-8"))["mappings"]] def test_observation_producers_yield_one_canonical_signal(): # the SAME OBSERVATION, emitted by three different producers with different raw ids produced = [ ProducedSignal(signal_id="cyclonedx_found", source_type="repository", provenance="sbom.cdx.json"), ProducedSignal(signal_id="spdx_found", source_type="repository", provenance="sbom.spdx"), ProducedSignal(signal_id="sbom_uploaded", source_type="document", provenance="customer_upload.pdf"), ] normalized = normalize_signals(produced, _VOCAB) assert {s.signal for s in normalized} == {"sbom_present"} # all reduced to ONE canonical observation assert {s.kind for s in normalized} == {"observation"} # all observations assert {s.source for s in normalized} == {"repository", "document"} # provenance preserved def test_requirement_and_observation_never_collapse(): # a tender that DEMANDS an SBOM must NOT become the same signal as a repo that HAS one normalized = normalize_signals([ ProducedSignal(signal_id="cyclonedx_found", source_type="repository"), # observation ProducedSignal(signal_id="requires_sbom", source_type="tender", provenance="tender §4.2"), # requirement ], _VOCAB) by_kind = {s.kind: s.signal for s in normalized} assert by_kind["observation"] == "sbom_present" assert by_kind["requirement"] == "sbom_required" assert by_kind["observation"] != by_kind["requirement"] def test_requirement_signal_produces_no_capability(): # the regression the whole fix is about: a DEMANDED SBOM yields NO detected capability, # but is preserved as a requirement; a real SBOM in the repo still IS detected. from_tender = normalize_signals([ProducedSignal(signal_id="requires_sbom", source_type="tender")], _VOCAB) res_tender = silent_intake(from_tender, _MAP) assert res_tender.capability_ids() == [] # NOT read as present assert res_tender.requirements_seen == ["sbom_required"] # but preserved + visible from_repo = normalize_signals([ProducedSignal(signal_id="cyclonedx_found", source_type="repository", evidence="sbom")], _VOCAB) assert silent_intake(from_repo, _MAP).capability_ids() == ["sbom_creation"] def test_vocabulary_kind_overrides_a_mislabelled_producer(): # even if a producer wrongly tags a requirement as observation, the vocabulary is authoritative norm = normalize_signals([ProducedSignal(signal_id="requires_sbom", source_type="tender", kind="observation")], _VOCAB) assert norm[0].signal == "sbom_required" and norm[0].kind == "requirement" def test_unknown_signal_passes_through_not_dropped(): out = normalize_signals([ProducedSignal(signal_id="brand_new_scanner_signal", source_type="api")], _VOCAB) assert out[0].signal == "brand_new_scanner_signal" # visible, not silently lost def test_confidence_and_provenance_flow_to_detected_capability(): norm = normalize_signals([ProducedSignal(signal_id="security_txt", source_type="website", confidence=0.8, evidence="cvd_policy", provenance="/.well-known/security.txt")], _VOCAB) res = silent_intake(norm, _MAP) d = next(d for d in res.detected_capabilities if d.capability == "coordinated_vulnerability_disclosure") assert d.confidence == 0.8 and d.provenance == "/.well-known/security.txt"