From c2c8f7e424b15456fe1be1c4f7034e44389266f6 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Sun, 28 Jun 2026 14:49:57 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20Signal=20Producer=20interface=20+=20Nor?= =?UTF-8?q?malizer=20=E2=80=94=20one=20signal=20language=20for=20all=20sou?= =?UTF-8?q?rces=20(before=20#58)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Not scanner stubs — the scanners exist. The Silent Pass needs only their UNIFIED output. This adds the small common DATA FORMAT (not a new module/framework) the user asked for, exactly the Requirement- Source / MCAP / regulation-alias pattern: many inputs, one language. Producer A / B / C -> normalize_signals (vocabulary: id + aliases) -> canonical IntakeSignal -> Silent Pass - ProducedSignal {signal_id, source_type, confidence, evidence, provenance} = what ANY source emits (website scanner, repo scanner, PDF parser, tender parser, API, the user). - knowledge/onboarding/signal_vocabulary.yaml reduces producer dialects to a canonical signal: "SBOM present" arrives as cyclonedx_found / spdx_found / sbom_uploaded / requires_sbom (tender) — all become `sbom_file_found`. The Silent Pass cannot tell where it came from -> no per-scanner special logic, ever. - Unknown signals pass through (a new producer stays visible). confidence/evidence/provenance flow to the detected capability for the audit trail. A tender that "requires SBOM" now produces the same effect as a repo that HAS one — fits Vision V2 (Requirement Source over Regulation). Endpoint (#58) then has its final shape: POST -> Producers -> Normalizer -> Silent Pass -> Profile -> Delta -> Questions -> Roadmap. Non-runtime -> no deploy. mypy --strict clean, 14 onboarding tests pass, check-loc 0. --- .../compliance/onboarding/__init__.py | 8 +++ .../compliance/onboarding/signals.py | 61 +++++++++++++++++++ .../compliance/onboarding/silent_intake.py | 17 ++++-- .../onboarding/signal_vocabulary.yaml | 28 +++++++++ .../onboarding_advisor_demo.md | 1 + .../onboarding_advisor_demo.py | 25 ++++---- .../tests/test_signal_producer.py | 60 ++++++++++++++++++ 7 files changed, 184 insertions(+), 16 deletions(-) create mode 100644 backend-compliance/compliance/onboarding/signals.py create mode 100644 backend-compliance/knowledge/onboarding/signal_vocabulary.yaml create mode 100644 backend-compliance/tests/test_signal_producer.py diff --git a/backend-compliance/compliance/onboarding/__init__.py b/backend-compliance/compliance/onboarding/__init__.py index 77b9bd3f..878dfdff 100644 --- a/backend-compliance/compliance/onboarding/__init__.py +++ b/backend-compliance/compliance/onboarding/__init__.py @@ -21,6 +21,11 @@ from .observations import ( empirical_distribution, reviewed, ) +from .signals import ( + ProducedSignal, + SignalVocabularyEntry, + normalize_signals, +) from .silent_intake import ( DetectedCapability, IntakeSignal, @@ -61,4 +66,7 @@ __all__ = [ "DetectedCapability", "ProductFact", "SilentIntakeResult", + "ProducedSignal", + "SignalVocabularyEntry", + "normalize_signals", ] diff --git a/backend-compliance/compliance/onboarding/signals.py b/backend-compliance/compliance/onboarding/signals.py new file mode 100644 index 00000000..b0a94d64 --- /dev/null +++ b/backend-compliance/compliance/onboarding/signals.py @@ -0,0 +1,61 @@ +"""Signal Producer interface + Normalizer — one signal language for all sources (NOT new architecture). + +The platform already HAS scanners (website, repo/code, SBOM, security headers, TLS, SPF/DKIM/DMARC, +document analysis, RAG over uploads, product classification). The Silent Pass does not want a +WebsiteScanner or a RepoScanner — it wants their UNIFIED output. So every source (a scanner, a PDF +parser, a tender parser, an API, or the user) emits the SAME `ProducedSignal` +{signal_id, source_type, confidence, evidence, provenance}, and `normalize_signals` reduces producer- +specific signal ids to ONE canonical signal id via a vocabulary (id + aliases) — exactly the +Requirement-Source / MCAP / regulation-alias pattern. The Silent Pass then never gets per-scanner logic. + +A common DATA FORMAT, not a new module/framework. Later a tender (`requires_sbom`) or an OEM spec +(`supplier_requires_psirt`) produces the same stream as a website — the Silent Pass cannot tell the +difference. Pure, deterministic, no I/O. Python 3.9 compatible. +""" + +from __future__ import annotations + +from typing import Dict, List, Optional, Sequence + +from pydantic import BaseModel, Field + +from .silent_intake import IntakeSignal + + +class ProducedSignal(BaseModel): + """What ANY signal producer emits — the common interface every source agrees on.""" + + signal_id: str # raw or canonical id the producer used + source_type: str = "" # website / repository / document / product / tender / oem / user / api + confidence: float = 1.0 + evidence: Optional[str] = None # the artifact found (already in hand) + provenance: str = "" # url / filename / tender clause / "customer statement" + + +class SignalVocabularyEntry(BaseModel): + """One canonical signal + the producer-specific aliases that mean the same thing.""" + + id: str + aliases: List[str] = Field(default_factory=list) + + +def normalize_signals( + produced: Sequence[ProducedSignal], vocabulary: Sequence[SignalVocabularyEntry] +) -> List[IntakeSignal]: + """Reduce heterogeneous producer signals to the canonical IntakeSignal stream (alias resolution). + + Unknown signal ids pass through unchanged (a new producer's signal stays visible, not silently + dropped). Deterministic; carries confidence/evidence/provenance for the audit trail. + """ + alias: Dict[str, str] = {} + for v in vocabulary: + alias[v.id] = v.id + for a in v.aliases: + alias[a] = v.id + out: List[IntakeSignal] = [] + for p in produced: + canonical = alias.get(p.signal_id, p.signal_id) + out.append(IntakeSignal( + source=p.source_type, signal=canonical, confidence=p.confidence, + evidence=p.evidence, provenance=p.provenance)) + return out diff --git a/backend-compliance/compliance/onboarding/silent_intake.py b/backend-compliance/compliance/onboarding/silent_intake.py index 97aba5a0..94fe20d3 100644 --- a/backend-compliance/compliance/onboarding/silent_intake.py +++ b/backend-compliance/compliance/onboarding/silent_intake.py @@ -20,11 +20,15 @@ from pydantic import BaseModel, Field class IntakeSignal(BaseModel): - """One finding a scanner/parser produced (no LLM here — the scanners are upstream).""" + """A CANONICAL signal the Silent Pass consumes. Producer-agnostic: the same `signal` may have come + from a website, a repo, a PDF, a tender or the user — normalize_signals() unified them (see signals.py).""" - source: str # website / repository / document / product - signal: str # signal id, e.g. "sbom_file_found" - detail: str = "" # optional (url, filename) for the audit trail + source: str # source_type: website / repository / document / product / tender / user + signal: str # CANONICAL signal id, e.g. "sbom_file_found" + confidence: float = 1.0 # carried from the producer + evidence: Optional[str] = None # the artifact already in hand + provenance: str = "" # where it came from (url / filename / tender clause) — audit trail + detail: str = "" # free-text (kept for back-compat) class SignalMapping(BaseModel): @@ -43,6 +47,8 @@ class DetectedCapability(BaseModel): relationship: str = "detected" source: str = "" # which signal/source detected it (audit trail) evidence: Optional[str] = None + confidence: float = 1.0 # carried from the producing signal + provenance: str = "" # where the signal came from class ProductFact(BaseModel): @@ -82,7 +88,8 @@ def silent_intake( if m.capability and m.capability not in caps: caps[m.capability] = DetectedCapability( capability=m.capability, relationship=m.relationship, - source="%s:%s" % (s.source, s.signal), evidence=m.evidence) + source="%s:%s" % (s.source, s.signal), evidence=m.evidence, + confidence=s.confidence, provenance=s.provenance) if m.evidence: evidence.add(m.evidence) if m.product_fact: diff --git a/backend-compliance/knowledge/onboarding/signal_vocabulary.yaml b/backend-compliance/knowledge/onboarding/signal_vocabulary.yaml new file mode 100644 index 00000000..2fb958f0 --- /dev/null +++ b/backend-compliance/knowledge/onboarding/signal_vocabulary.yaml @@ -0,0 +1,28 @@ +# Signal Vocabulary — canonical signal id + the producer-specific aliases that mean the same thing. +# +# The same fact ("SBOM present") can arrive as CycloneDX, SPDX, a GitHub Action, a Maven plugin, a +# document upload, a customer statement, a tender clause or a repo file. For the Silent Pass they are +# ALL identical: `sbom_file_found`. This file reduces them to one canonical signal — same pattern as the +# regulation-alias vocabulary, MCAPs and Requirement Sources: many inputs, one language. No scanner- +# specific logic ever reaches the Silent Pass. Pure DATA, injected into normalize_signals(). No real names. + +signals: + - {id: sbom_file_found, aliases: [cyclonedx_found, spdx_found, sbom_in_repo, sbom_present, sbom_uploaded, requires_sbom, sbom_in_tender]} + - {id: security_txt_or_cvd_policy, aliases: [security_txt, vdp_found, cvd_policy_pdf, psirt_page, coordinated_disclosure_policy, supplier_requires_psirt]} + - {id: signed_releases, aliases: [signed_artifacts, cosign_found, gpg_signed_releases, code_signing_cert, secure_boot]} + - {id: github_actions_ci, aliases: [ci_pipeline, gitlab_ci, jenkins_pipeline, build_automation]} + - {id: dependency_scanning, aliases: [dependabot, renovate, snyk_found, trivy_in_ci, sca_tool]} + - {id: ce_marking_on_site, aliases: [ce_logo_detected, ce_mark_image]} + - {id: ce_conformity_doc, aliases: [declaration_of_conformity_doc, ce_doc_uploaded, conformity_pdf]} + - {id: support_lifecycle_page, aliases: [eol_policy_page, lifecycle_doc, support_period_stated]} + - {id: security_policy_page, aliases: [isms_statement, iso27001_badge, security_overview_page]} + - {id: product_risk_assessment_doc, aliases: [risk_assessment_pdf, hazard_analysis_doc, tara_doc]} + - {id: patch_policy_doc, aliases: [patch_management_policy, update_policy_pdf]} + - {id: incident_response_plan_doc, aliases: [irp_doc, incident_playbook]} + # product facts + - {id: cloud_connectivity, aliases: [cloud_hosted, saas, internet_facing, connected_product]} + - {id: plc_sps, aliases: [plc_detected, sps_steuerung, industrial_controller]} + - {id: embedded_software, aliases: [firmware_present, embedded_device]} + - {id: wireless_radio, aliases: [bluetooth, wifi_module, radio_equipment, funkmodul]} + - {id: remote_access, aliases: [remote_maintenance, vpn_access, teleservice, fernwartung]} + - {id: generates_usage_data, aliases: [telemetry_collected, usage_analytics]} diff --git a/backend-compliance/reference_scenarios/onboarding_advisor_demo.md b/backend-compliance/reference_scenarios/onboarding_advisor_demo.md index d62f7c31..db964dd5 100644 --- a/backend-compliance/reference_scenarios/onboarding_advisor_demo.md +++ b/backend-compliance/reference_scenarios/onboarding_advisor_demo.md @@ -6,6 +6,7 @@ _Eingabe: Unternehmen + Produkte + Zertifizierungen + Ziel. Den Rest macht die O > Zertifizierungen: **ISO9001, ISO27001, ISO14001, TISAX** · Produkt: **Parkschein-/Schrankensystem** · Ziel: **CRA** ## Phase 0 — Stille Vorbefüllung (BEVOR eine Frage erscheint) +- **Signal Producer (verschiedene Dialekte → ein kanonisches Signal):** `vdp_found`(website), `cyclonedx_found`(repository), `cosign_found`(repository), `risk_assessment_pdf`(document), `cloud_hosted`(product), `plc_detected`(product) > Stille Vorbefüllung: 4 Fähigkeit(en) automatisch erkannt, 2 Produktfakt(en), 4 Nachweis(e) bereits vorhanden. - **Automatisch erkannte Fähigkeiten:** `coordinated_vulnerability_disclosure`, `product_cyber_risk_assessment`, `sbom_creation`, `secure_signed_update_distribution` - **Produktfakten (steuern den Scope):** `connected_to_internet=true`, `is_machine=true` diff --git a/backend-compliance/reference_scenarios/onboarding_advisor_demo.py b/backend-compliance/reference_scenarios/onboarding_advisor_demo.py index 3ef3e288..0393433c 100644 --- a/backend-compliance/reference_scenarios/onboarding_advisor_demo.py +++ b/backend-compliance/reference_scenarios/onboarding_advisor_demo.py @@ -13,8 +13,8 @@ import os import yaml from compliance.onboarding import ( - CapabilityHypothesis, IntakeSignal, OnboardingInput, SignalMapping, - advisor_start, resolve_for_certifications, silent_intake, + CapabilityHypothesis, OnboardingInput, ProducedSignal, SignalMapping, SignalVocabularyEntry, + advisor_start, normalize_signals, resolve_for_certifications, silent_intake, ) from compliance.transition_reasoning import TargetRequirement @@ -40,15 +40,17 @@ inp = OnboardingInput(company="synthetisch", industry="machine_builder", certifications=["ISO9001", "ISO27001", "ISO14001", "TISAX"], known_evidence=["CE process"], target=["CRA"]) hyp = resolve_for_certifications(inp.certifications, _lib) -# Phase 0 — Silent Knowledge Pass: recognise everything possible from scanner signals BEFORE asking. -_smap = [SignalMapping(**m) for m in yaml.safe_load( - open(os.path.join(os.path.dirname(__file__), "..", "knowledge", "onboarding", "intake_signal_map.yaml"), encoding="utf-8"))["mappings"]] -_signals = [IntakeSignal(source="website", signal="security_txt_or_cvd_policy", detail="/.well-known/security.txt"), - IntakeSignal(source="repository", signal="sbom_file_found", detail="sbom.cdx.json"), - IntakeSignal(source="repository", signal="signed_releases"), - IntakeSignal(source="document", signal="product_risk_assessment_doc"), - IntakeSignal(source="product", signal="cloud_connectivity"), - IntakeSignal(source="product", signal="plc_sps")] +# Phase 0 — Signal Producers emit raw dialects -> Normalizer -> one canonical stream -> Silent Pass. +_K = os.path.join(os.path.dirname(__file__), "..", "knowledge", "onboarding") +_vocab = [SignalVocabularyEntry(**v) for v in yaml.safe_load(open(os.path.join(_K, "signal_vocabulary.yaml"), encoding="utf-8"))["signals"]] +_smap = [SignalMapping(**m) for m in yaml.safe_load(open(os.path.join(_K, "intake_signal_map.yaml"), encoding="utf-8"))["mappings"]] +_produced = [ProducedSignal(signal_id="vdp_found", source_type="website", provenance="/.well-known/security.txt"), + ProducedSignal(signal_id="cyclonedx_found", source_type="repository", evidence="sbom", provenance="sbom.cdx.json"), + ProducedSignal(signal_id="cosign_found", source_type="repository", provenance="cosign.pub"), + ProducedSignal(signal_id="risk_assessment_pdf", source_type="document", provenance="risk_assessment.pdf"), + ProducedSignal(signal_id="cloud_hosted", source_type="product"), + ProducedSignal(signal_id="plc_detected", source_type="product")] +_signals = normalize_signals(_produced, _vocab) # raw producer dialects -> ONE canonical signal language si = silent_intake(_signals, _smap) res = advisor_start(inp, hyp, req, target_id="CRA", covers_targets=covers, corpus_status={"CRA": "validated"}, detected_capabilities=si.capability_ids()) @@ -61,6 +63,7 @@ w("## Eingabe") w("> Zertifizierungen: **%s** · Produkt: **%s** · Ziel: **%s**" % (", ".join(inp.certifications), inp.products[0], ", ".join(inp.target))) w("") w("## Phase 0 — Stille Vorbefüllung (BEVOR eine Frage erscheint)") +w("- **Signal Producer (verschiedene Dialekte → ein kanonisches Signal):** %s" % ", ".join("`%s`(%s)" % (p.signal_id, p.source_type) for p in _produced)) w("> %s" % si.summary) w("- **Automatisch erkannte Fähigkeiten:** %s" % ", ".join("`%s`" % d.capability for d in si.detected_capabilities)) w("- **Produktfakten (steuern den Scope):** %s" % ", ".join("`%s=%s`" % (f.key, f.value) for f in si.product_facts)) diff --git a/backend-compliance/tests/test_signal_producer.py b/backend-compliance/tests/test_signal_producer.py new file mode 100644 index 00000000..0ded5e1d --- /dev/null +++ b/backend-compliance/tests/test_signal_producer.py @@ -0,0 +1,60 @@ +"""Signal Producer + Normalizer — one signal language for all sources. + +Pins the abstraction the user asked for: every source emits the same ProducedSignal, and the Normalizer +reduces producer-specific signal ids to ONE canonical signal via a vocabulary. The Silent Pass therefore +cannot tell whether "SBOM present" came from a website, a repo, a PDF, a tender or the user — and gets no +per-scanner logic. +""" + +from __future__ import annotations + +import os + +import yaml + +from compliance.onboarding import ( + ProducedSignal, + SignalMapping, + SignalVocabularyEntry, + normalize_signals, + silent_intake, +) + +_DIR = os.path.dirname(__file__) +_VOCAB = [SignalVocabularyEntry(**v) for v in yaml.safe_load( + open(os.path.join(_DIR, "..", "knowledge", "onboarding", "signal_vocabulary.yaml"), encoding="utf-8"))["signals"]] +_MAP = [SignalMapping(**m) for m in yaml.safe_load( + open(os.path.join(_DIR, "..", "knowledge", "onboarding", "intake_signal_map.yaml"), encoding="utf-8"))["mappings"]] + + +def test_different_producers_yield_the_same_canonical_signal(): + # the SAME fact, emitted by four totally different producers with different raw ids + produced = [ + ProducedSignal(signal_id="cyclonedx_found", source_type="repository", provenance="sbom.cdx.json"), + ProducedSignal(signal_id="spdx_found", source_type="repository", provenance="sbom.spdx"), + ProducedSignal(signal_id="sbom_uploaded", source_type="document", provenance="customer_upload.pdf"), + ProducedSignal(signal_id="requires_sbom", source_type="tender", provenance="tender §4.2"), + ] + normalized = normalize_signals(produced, _VOCAB) + assert {s.signal for s in normalized} == {"sbom_file_found"} # all reduced to ONE canonical signal + assert {s.source for s in normalized} == {"repository", "document", "tender"} # provenance preserved + + +def test_silent_pass_consumes_normalized_signals_source_agnostic(): + # a tender that "requires SBOM" produces the same effect as a repo that HAS one + from_repo = normalize_signals([ProducedSignal(signal_id="cyclonedx_found", source_type="repository", evidence="sbom")], _VOCAB) + from_tender = normalize_signals([ProducedSignal(signal_id="requires_sbom", source_type="tender")], _VOCAB) + assert silent_intake(from_repo, _MAP).capability_ids() == silent_intake(from_tender, _MAP).capability_ids() == ["sbom_creation"] + + +def test_unknown_signal_passes_through_not_dropped(): + out = normalize_signals([ProducedSignal(signal_id="brand_new_scanner_signal", source_type="api")], _VOCAB) + assert out[0].signal == "brand_new_scanner_signal" # visible, not silently lost + + +def test_confidence_and_provenance_flow_to_detected_capability(): + norm = normalize_signals([ProducedSignal(signal_id="security_txt", source_type="website", + confidence=0.8, evidence="cvd_policy", provenance="/.well-known/security.txt")], _VOCAB) + res = silent_intake(norm, _MAP) + d = next(d for d in res.detected_capabilities if d.capability == "coordinated_vulnerability_disclosure") + assert d.confidence == 0.8 and d.provenance == "/.well-known/security.txt"