feat: Signal Producer interface + Normalizer — one signal language for all sources (before #58)

Not scanner stubs — the scanners exist. The Silent Pass needs only their UNIFIED output. This adds the
small common DATA FORMAT (not a new module/framework) the user asked for, exactly the Requirement-
Source / MCAP / regulation-alias pattern: many inputs, one language.

  Producer A / B / C  ->  normalize_signals (vocabulary: id + aliases)  ->  canonical IntakeSignal  ->  Silent Pass

- ProducedSignal {signal_id, source_type, confidence, evidence, provenance} = what ANY source emits
  (website scanner, repo scanner, PDF parser, tender parser, API, the user).
- knowledge/onboarding/signal_vocabulary.yaml reduces producer dialects to a canonical signal: "SBOM
  present" arrives as cyclonedx_found / spdx_found / sbom_uploaded / requires_sbom (tender) — all become
  `sbom_file_found`. The Silent Pass cannot tell where it came from -> no per-scanner special logic, ever.
- Unknown signals pass through (a new producer stays visible). confidence/evidence/provenance flow to
  the detected capability for the audit trail.

A tender that "requires SBOM" now produces the same effect as a repo that HAS one — fits Vision V2
(Requirement Source over Regulation). Endpoint (#58) then has its final shape: POST -> Producers ->
Normalizer -> Silent Pass -> Profile -> Delta -> Questions -> Roadmap. Non-runtime -> no deploy. mypy
--strict clean, 14 onboarding tests pass, check-loc 0.
This commit is contained in:
Benjamin Admin
2026-06-28 14:49:57 +02:00
parent 9c33582412
commit c2c8f7e424
7 changed files with 184 additions and 16 deletions
@@ -21,6 +21,11 @@ from .observations import (
empirical_distribution, empirical_distribution,
reviewed, reviewed,
) )
from .signals import (
ProducedSignal,
SignalVocabularyEntry,
normalize_signals,
)
from .silent_intake import ( from .silent_intake import (
DetectedCapability, DetectedCapability,
IntakeSignal, IntakeSignal,
@@ -61,4 +66,7 @@ __all__ = [
"DetectedCapability", "DetectedCapability",
"ProductFact", "ProductFact",
"SilentIntakeResult", "SilentIntakeResult",
"ProducedSignal",
"SignalVocabularyEntry",
"normalize_signals",
] ]
@@ -0,0 +1,61 @@
"""Signal Producer interface + Normalizer — one signal language for all sources (NOT new architecture).
The platform already HAS scanners (website, repo/code, SBOM, security headers, TLS, SPF/DKIM/DMARC,
document analysis, RAG over uploads, product classification). The Silent Pass does not want a
WebsiteScanner or a RepoScanner — it wants their UNIFIED output. So every source (a scanner, a PDF
parser, a tender parser, an API, or the user) emits the SAME `ProducedSignal`
{signal_id, source_type, confidence, evidence, provenance}, and `normalize_signals` reduces producer-
specific signal ids to ONE canonical signal id via a vocabulary (id + aliases) — exactly the
Requirement-Source / MCAP / regulation-alias pattern. The Silent Pass then never gets per-scanner logic.
A common DATA FORMAT, not a new module/framework. Later a tender (`requires_sbom`) or an OEM spec
(`supplier_requires_psirt`) produces the same stream as a website — the Silent Pass cannot tell the
difference. Pure, deterministic, no I/O. Python 3.9 compatible.
"""
from __future__ import annotations
from typing import Dict, List, Optional, Sequence
from pydantic import BaseModel, Field
from .silent_intake import IntakeSignal
class ProducedSignal(BaseModel):
"""What ANY signal producer emits — the common interface every source agrees on."""
signal_id: str # raw or canonical id the producer used
source_type: str = "" # website / repository / document / product / tender / oem / user / api
confidence: float = 1.0
evidence: Optional[str] = None # the artifact found (already in hand)
provenance: str = "" # url / filename / tender clause / "customer statement"
class SignalVocabularyEntry(BaseModel):
"""One canonical signal + the producer-specific aliases that mean the same thing."""
id: str
aliases: List[str] = Field(default_factory=list)
def normalize_signals(
produced: Sequence[ProducedSignal], vocabulary: Sequence[SignalVocabularyEntry]
) -> List[IntakeSignal]:
"""Reduce heterogeneous producer signals to the canonical IntakeSignal stream (alias resolution).
Unknown signal ids pass through unchanged (a new producer's signal stays visible, not silently
dropped). Deterministic; carries confidence/evidence/provenance for the audit trail.
"""
alias: Dict[str, str] = {}
for v in vocabulary:
alias[v.id] = v.id
for a in v.aliases:
alias[a] = v.id
out: List[IntakeSignal] = []
for p in produced:
canonical = alias.get(p.signal_id, p.signal_id)
out.append(IntakeSignal(
source=p.source_type, signal=canonical, confidence=p.confidence,
evidence=p.evidence, provenance=p.provenance))
return out
@@ -20,11 +20,15 @@ from pydantic import BaseModel, Field
class IntakeSignal(BaseModel): class IntakeSignal(BaseModel):
"""One finding a scanner/parser produced (no LLM here — the scanners are upstream).""" """A CANONICAL signal the Silent Pass consumes. Producer-agnostic: the same `signal` may have come
from a website, a repo, a PDF, a tender or the user — normalize_signals() unified them (see signals.py)."""
source: str # website / repository / document / product source: str # source_type: website / repository / document / product / tender / user
signal: str # signal id, e.g. "sbom_file_found" signal: str # CANONICAL signal id, e.g. "sbom_file_found"
detail: str = "" # optional (url, filename) for the audit trail confidence: float = 1.0 # carried from the producer
evidence: Optional[str] = None # the artifact already in hand
provenance: str = "" # where it came from (url / filename / tender clause) — audit trail
detail: str = "" # free-text (kept for back-compat)
class SignalMapping(BaseModel): class SignalMapping(BaseModel):
@@ -43,6 +47,8 @@ class DetectedCapability(BaseModel):
relationship: str = "detected" relationship: str = "detected"
source: str = "" # which signal/source detected it (audit trail) source: str = "" # which signal/source detected it (audit trail)
evidence: Optional[str] = None evidence: Optional[str] = None
confidence: float = 1.0 # carried from the producing signal
provenance: str = "" # where the signal came from
class ProductFact(BaseModel): class ProductFact(BaseModel):
@@ -82,7 +88,8 @@ def silent_intake(
if m.capability and m.capability not in caps: if m.capability and m.capability not in caps:
caps[m.capability] = DetectedCapability( caps[m.capability] = DetectedCapability(
capability=m.capability, relationship=m.relationship, capability=m.capability, relationship=m.relationship,
source="%s:%s" % (s.source, s.signal), evidence=m.evidence) source="%s:%s" % (s.source, s.signal), evidence=m.evidence,
confidence=s.confidence, provenance=s.provenance)
if m.evidence: if m.evidence:
evidence.add(m.evidence) evidence.add(m.evidence)
if m.product_fact: if m.product_fact:
@@ -0,0 +1,28 @@
# Signal Vocabulary — canonical signal id + the producer-specific aliases that mean the same thing.
#
# The same fact ("SBOM present") can arrive as CycloneDX, SPDX, a GitHub Action, a Maven plugin, a
# document upload, a customer statement, a tender clause or a repo file. For the Silent Pass they are
# ALL identical: `sbom_file_found`. This file reduces them to one canonical signal — same pattern as the
# regulation-alias vocabulary, MCAPs and Requirement Sources: many inputs, one language. No scanner-
# specific logic ever reaches the Silent Pass. Pure DATA, injected into normalize_signals(). No real names.
signals:
- {id: sbom_file_found, aliases: [cyclonedx_found, spdx_found, sbom_in_repo, sbom_present, sbom_uploaded, requires_sbom, sbom_in_tender]}
- {id: security_txt_or_cvd_policy, aliases: [security_txt, vdp_found, cvd_policy_pdf, psirt_page, coordinated_disclosure_policy, supplier_requires_psirt]}
- {id: signed_releases, aliases: [signed_artifacts, cosign_found, gpg_signed_releases, code_signing_cert, secure_boot]}
- {id: github_actions_ci, aliases: [ci_pipeline, gitlab_ci, jenkins_pipeline, build_automation]}
- {id: dependency_scanning, aliases: [dependabot, renovate, snyk_found, trivy_in_ci, sca_tool]}
- {id: ce_marking_on_site, aliases: [ce_logo_detected, ce_mark_image]}
- {id: ce_conformity_doc, aliases: [declaration_of_conformity_doc, ce_doc_uploaded, conformity_pdf]}
- {id: support_lifecycle_page, aliases: [eol_policy_page, lifecycle_doc, support_period_stated]}
- {id: security_policy_page, aliases: [isms_statement, iso27001_badge, security_overview_page]}
- {id: product_risk_assessment_doc, aliases: [risk_assessment_pdf, hazard_analysis_doc, tara_doc]}
- {id: patch_policy_doc, aliases: [patch_management_policy, update_policy_pdf]}
- {id: incident_response_plan_doc, aliases: [irp_doc, incident_playbook]}
# product facts
- {id: cloud_connectivity, aliases: [cloud_hosted, saas, internet_facing, connected_product]}
- {id: plc_sps, aliases: [plc_detected, sps_steuerung, industrial_controller]}
- {id: embedded_software, aliases: [firmware_present, embedded_device]}
- {id: wireless_radio, aliases: [bluetooth, wifi_module, radio_equipment, funkmodul]}
- {id: remote_access, aliases: [remote_maintenance, vpn_access, teleservice, fernwartung]}
- {id: generates_usage_data, aliases: [telemetry_collected, usage_analytics]}
@@ -6,6 +6,7 @@ _Eingabe: Unternehmen + Produkte + Zertifizierungen + Ziel. Den Rest macht die O
> Zertifizierungen: **ISO9001, ISO27001, ISO14001, TISAX** · Produkt: **Parkschein-/Schrankensystem** · Ziel: **CRA** > Zertifizierungen: **ISO9001, ISO27001, ISO14001, TISAX** · Produkt: **Parkschein-/Schrankensystem** · Ziel: **CRA**
## Phase 0 — Stille Vorbefüllung (BEVOR eine Frage erscheint) ## Phase 0 — Stille Vorbefüllung (BEVOR eine Frage erscheint)
- **Signal Producer (verschiedene Dialekte → ein kanonisches Signal):** `vdp_found`(website), `cyclonedx_found`(repository), `cosign_found`(repository), `risk_assessment_pdf`(document), `cloud_hosted`(product), `plc_detected`(product)
> Stille Vorbefüllung: 4 Fähigkeit(en) automatisch erkannt, 2 Produktfakt(en), 4 Nachweis(e) bereits vorhanden. > Stille Vorbefüllung: 4 Fähigkeit(en) automatisch erkannt, 2 Produktfakt(en), 4 Nachweis(e) bereits vorhanden.
- **Automatisch erkannte Fähigkeiten:** `coordinated_vulnerability_disclosure`, `product_cyber_risk_assessment`, `sbom_creation`, `secure_signed_update_distribution` - **Automatisch erkannte Fähigkeiten:** `coordinated_vulnerability_disclosure`, `product_cyber_risk_assessment`, `sbom_creation`, `secure_signed_update_distribution`
- **Produktfakten (steuern den Scope):** `connected_to_internet=true`, `is_machine=true` - **Produktfakten (steuern den Scope):** `connected_to_internet=true`, `is_machine=true`
@@ -13,8 +13,8 @@ import os
import yaml import yaml
from compliance.onboarding import ( from compliance.onboarding import (
CapabilityHypothesis, IntakeSignal, OnboardingInput, SignalMapping, CapabilityHypothesis, OnboardingInput, ProducedSignal, SignalMapping, SignalVocabularyEntry,
advisor_start, resolve_for_certifications, silent_intake, advisor_start, normalize_signals, resolve_for_certifications, silent_intake,
) )
from compliance.transition_reasoning import TargetRequirement from compliance.transition_reasoning import TargetRequirement
@@ -40,15 +40,17 @@ inp = OnboardingInput(company="synthetisch", industry="machine_builder",
certifications=["ISO9001", "ISO27001", "ISO14001", "TISAX"], certifications=["ISO9001", "ISO27001", "ISO14001", "TISAX"],
known_evidence=["CE process"], target=["CRA"]) known_evidence=["CE process"], target=["CRA"])
hyp = resolve_for_certifications(inp.certifications, _lib) hyp = resolve_for_certifications(inp.certifications, _lib)
# Phase 0 — Silent Knowledge Pass: recognise everything possible from scanner signals BEFORE asking. # Phase 0 — Signal Producers emit raw dialects -> Normalizer -> one canonical stream -> Silent Pass.
_smap = [SignalMapping(**m) for m in yaml.safe_load( _K = os.path.join(os.path.dirname(__file__), "..", "knowledge", "onboarding")
open(os.path.join(os.path.dirname(__file__), "..", "knowledge", "onboarding", "intake_signal_map.yaml"), encoding="utf-8"))["mappings"]] _vocab = [SignalVocabularyEntry(**v) for v in yaml.safe_load(open(os.path.join(_K, "signal_vocabulary.yaml"), encoding="utf-8"))["signals"]]
_signals = [IntakeSignal(source="website", signal="security_txt_or_cvd_policy", detail="/.well-known/security.txt"), _smap = [SignalMapping(**m) for m in yaml.safe_load(open(os.path.join(_K, "intake_signal_map.yaml"), encoding="utf-8"))["mappings"]]
IntakeSignal(source="repository", signal="sbom_file_found", detail="sbom.cdx.json"), _produced = [ProducedSignal(signal_id="vdp_found", source_type="website", provenance="/.well-known/security.txt"),
IntakeSignal(source="repository", signal="signed_releases"), ProducedSignal(signal_id="cyclonedx_found", source_type="repository", evidence="sbom", provenance="sbom.cdx.json"),
IntakeSignal(source="document", signal="product_risk_assessment_doc"), ProducedSignal(signal_id="cosign_found", source_type="repository", provenance="cosign.pub"),
IntakeSignal(source="product", signal="cloud_connectivity"), ProducedSignal(signal_id="risk_assessment_pdf", source_type="document", provenance="risk_assessment.pdf"),
IntakeSignal(source="product", signal="plc_sps")] ProducedSignal(signal_id="cloud_hosted", source_type="product"),
ProducedSignal(signal_id="plc_detected", source_type="product")]
_signals = normalize_signals(_produced, _vocab) # raw producer dialects -> ONE canonical signal language
si = silent_intake(_signals, _smap) si = silent_intake(_signals, _smap)
res = advisor_start(inp, hyp, req, target_id="CRA", covers_targets=covers, corpus_status={"CRA": "validated"}, res = advisor_start(inp, hyp, req, target_id="CRA", covers_targets=covers, corpus_status={"CRA": "validated"},
detected_capabilities=si.capability_ids()) detected_capabilities=si.capability_ids())
@@ -61,6 +63,7 @@ w("## Eingabe")
w("> Zertifizierungen: **%s** · Produkt: **%s** · Ziel: **%s**" % (", ".join(inp.certifications), inp.products[0], ", ".join(inp.target))) w("> Zertifizierungen: **%s** · Produkt: **%s** · Ziel: **%s**" % (", ".join(inp.certifications), inp.products[0], ", ".join(inp.target)))
w("") w("")
w("## Phase 0 — Stille Vorbefüllung (BEVOR eine Frage erscheint)") w("## Phase 0 — Stille Vorbefüllung (BEVOR eine Frage erscheint)")
w("- **Signal Producer (verschiedene Dialekte → ein kanonisches Signal):** %s" % ", ".join("`%s`(%s)" % (p.signal_id, p.source_type) for p in _produced))
w("> %s" % si.summary) w("> %s" % si.summary)
w("- **Automatisch erkannte Fähigkeiten:** %s" % ", ".join("`%s`" % d.capability for d in si.detected_capabilities)) w("- **Automatisch erkannte Fähigkeiten:** %s" % ", ".join("`%s`" % d.capability for d in si.detected_capabilities))
w("- **Produktfakten (steuern den Scope):** %s" % ", ".join("`%s=%s`" % (f.key, f.value) for f in si.product_facts)) w("- **Produktfakten (steuern den Scope):** %s" % ", ".join("`%s=%s`" % (f.key, f.value) for f in si.product_facts))
@@ -0,0 +1,60 @@
"""Signal Producer + Normalizer — one signal language for all sources.
Pins the abstraction the user asked for: every source emits the same ProducedSignal, and the Normalizer
reduces producer-specific signal ids to ONE canonical signal via a vocabulary. The Silent Pass therefore
cannot tell whether "SBOM present" came from a website, a repo, a PDF, a tender or the user — and gets no
per-scanner logic.
"""
from __future__ import annotations
import os
import yaml
from compliance.onboarding import (
ProducedSignal,
SignalMapping,
SignalVocabularyEntry,
normalize_signals,
silent_intake,
)
_DIR = os.path.dirname(__file__)
_VOCAB = [SignalVocabularyEntry(**v) for v in yaml.safe_load(
open(os.path.join(_DIR, "..", "knowledge", "onboarding", "signal_vocabulary.yaml"), encoding="utf-8"))["signals"]]
_MAP = [SignalMapping(**m) for m in yaml.safe_load(
open(os.path.join(_DIR, "..", "knowledge", "onboarding", "intake_signal_map.yaml"), encoding="utf-8"))["mappings"]]
def test_different_producers_yield_the_same_canonical_signal():
# the SAME fact, emitted by four totally different producers with different raw ids
produced = [
ProducedSignal(signal_id="cyclonedx_found", source_type="repository", provenance="sbom.cdx.json"),
ProducedSignal(signal_id="spdx_found", source_type="repository", provenance="sbom.spdx"),
ProducedSignal(signal_id="sbom_uploaded", source_type="document", provenance="customer_upload.pdf"),
ProducedSignal(signal_id="requires_sbom", source_type="tender", provenance="tender §4.2"),
]
normalized = normalize_signals(produced, _VOCAB)
assert {s.signal for s in normalized} == {"sbom_file_found"} # all reduced to ONE canonical signal
assert {s.source for s in normalized} == {"repository", "document", "tender"} # provenance preserved
def test_silent_pass_consumes_normalized_signals_source_agnostic():
# a tender that "requires SBOM" produces the same effect as a repo that HAS one
from_repo = normalize_signals([ProducedSignal(signal_id="cyclonedx_found", source_type="repository", evidence="sbom")], _VOCAB)
from_tender = normalize_signals([ProducedSignal(signal_id="requires_sbom", source_type="tender")], _VOCAB)
assert silent_intake(from_repo, _MAP).capability_ids() == silent_intake(from_tender, _MAP).capability_ids() == ["sbom_creation"]
def test_unknown_signal_passes_through_not_dropped():
out = normalize_signals([ProducedSignal(signal_id="brand_new_scanner_signal", source_type="api")], _VOCAB)
assert out[0].signal == "brand_new_scanner_signal" # visible, not silently lost
def test_confidence_and_provenance_flow_to_detected_capability():
norm = normalize_signals([ProducedSignal(signal_id="security_txt", source_type="website",
confidence=0.8, evidence="cvd_policy", provenance="/.well-known/security.txt")], _VOCAB)
res = silent_intake(norm, _MAP)
d = next(d for d in res.detected_capabilities if d.capability == "coordinated_vulnerability_disclosure")
assert d.confidence == 0.8 and d.provenance == "/.well-known/security.txt"