"""Signal Producer + Normalizer — one signal language for all sources. Pins the abstraction the user asked for: every source emits the same ProducedSignal, and the Normalizer reduces producer-specific signal ids to ONE canonical signal via a vocabulary. The Silent Pass therefore cannot tell whether "SBOM present" came from a website, a repo, a PDF, a tender or the user — and gets no per-scanner logic. """ from __future__ import annotations import os import yaml from compliance.onboarding import ( ProducedSignal, SignalMapping, SignalVocabularyEntry, normalize_signals, silent_intake, ) _DIR = os.path.dirname(__file__) _VOCAB = [SignalVocabularyEntry(**v) for v in yaml.safe_load( open(os.path.join(_DIR, "..", "knowledge", "onboarding", "signal_vocabulary.yaml"), encoding="utf-8"))["signals"]] _MAP = [SignalMapping(**m) for m in yaml.safe_load( open(os.path.join(_DIR, "..", "knowledge", "onboarding", "intake_signal_map.yaml"), encoding="utf-8"))["mappings"]] def test_different_producers_yield_the_same_canonical_signal(): # the SAME fact, emitted by four totally different producers with different raw ids produced = [ ProducedSignal(signal_id="cyclonedx_found", source_type="repository", provenance="sbom.cdx.json"), ProducedSignal(signal_id="spdx_found", source_type="repository", provenance="sbom.spdx"), ProducedSignal(signal_id="sbom_uploaded", source_type="document", provenance="customer_upload.pdf"), ProducedSignal(signal_id="requires_sbom", source_type="tender", provenance="tender §4.2"), ] normalized = normalize_signals(produced, _VOCAB) assert {s.signal for s in normalized} == {"sbom_file_found"} # all reduced to ONE canonical signal assert {s.source for s in normalized} == {"repository", "document", "tender"} # provenance preserved def test_silent_pass_consumes_normalized_signals_source_agnostic(): # a tender that "requires SBOM" produces the same effect as a repo that HAS one from_repo = normalize_signals([ProducedSignal(signal_id="cyclonedx_found", source_type="repository", evidence="sbom")], _VOCAB) from_tender = normalize_signals([ProducedSignal(signal_id="requires_sbom", source_type="tender")], _VOCAB) assert silent_intake(from_repo, _MAP).capability_ids() == silent_intake(from_tender, _MAP).capability_ids() == ["sbom_creation"] def test_unknown_signal_passes_through_not_dropped(): out = normalize_signals([ProducedSignal(signal_id="brand_new_scanner_signal", source_type="api")], _VOCAB) assert out[0].signal == "brand_new_scanner_signal" # visible, not silently lost def test_confidence_and_provenance_flow_to_detected_capability(): norm = normalize_signals([ProducedSignal(signal_id="security_txt", source_type="website", confidence=0.8, evidence="cvd_policy", provenance="/.well-known/security.txt")], _VOCAB) res = silent_intake(norm, _MAP) d = next(d for d in res.detected_capabilities if d.capability == "coordinated_vulnerability_disclosure") assert d.confidence == 0.8 and d.provenance == "/.well-known/security.txt"