diff --git a/backend-compliance/compliance/onboarding/signals.py b/backend-compliance/compliance/onboarding/signals.py index b0a94d64..e982d31a 100644 --- a/backend-compliance/compliance/onboarding/signals.py +++ b/backend-compliance/compliance/onboarding/signals.py @@ -1,16 +1,21 @@ -"""Signal Producer interface + Normalizer — one signal language for all sources (NOT new architecture). +"""Signal Producer interface + Normalizer — one signal language, but TWO signal KINDS. The platform already HAS scanners (website, repo/code, SBOM, security headers, TLS, SPF/DKIM/DMARC, document analysis, RAG over uploads, product classification). The Silent Pass does not want a WebsiteScanner or a RepoScanner — it wants their UNIFIED output. So every source (a scanner, a PDF -parser, a tender parser, an API, or the user) emits the SAME `ProducedSignal` -{signal_id, source_type, confidence, evidence, provenance}, and `normalize_signals` reduces producer- -specific signal ids to ONE canonical signal id via a vocabulary (id + aliases) — exactly the +parser, a tender parser, an OEM spec, an API, or the user) emits the SAME `ProducedSignal` +{signal_id, source_type, kind, confidence, evidence, provenance}, and `normalize_signals` reduces +producer-specific ids to ONE canonical signal via a vocabulary (id + aliases + kind) — exactly the Requirement-Source / MCAP / regulation-alias pattern. The Silent Pass then never gets per-scanner logic. -A common DATA FORMAT, not a new module/framework. Later a tender (`requires_sbom`) or an OEM spec -(`supplier_requires_psirt`) produces the same stream as a website — the Silent Pass cannot tell the -difference. Pure, deterministic, no I/O. Python 3.9 compatible. +CRITICAL — a signal is one of two KINDS, and they NEVER substitute for each other: + observation = "I SAW X" — a repo with an SBOM, a published security.txt, a risk-assessment PDF. + requirement = "someone DEMANDS X" — a tender clause `requires_sbom`, an OEM spec `supplier_requires_psirt`. +A demanded SBOM is NOT a present SBOM. `kind` is carried on the canonical VOCABULARY entry (authoritative), +so even a mislabelled producer signal cannot collapse the two. The Silent Pass consumes ONLY observations; +requirement signals are preserved and feed the required-set / prioritisation later. This Observation-vs- +Requirement split is the very one the Requirements Verification Platform rests on: Observations (reality) +vs Requirements (targets); their comparison IS the delta. Pure, deterministic, no I/O. Python 3.9 compatible. """ from __future__ import annotations @@ -27,15 +32,17 @@ class ProducedSignal(BaseModel): signal_id: str # raw or canonical id the producer used source_type: str = "" # website / repository / document / product / tender / oem / user / api + kind: str = "" # "observation" | "requirement"; empty -> resolved from the vocabulary confidence: float = 1.0 evidence: Optional[str] = None # the artifact found (already in hand) provenance: str = "" # url / filename / tender clause / "customer statement" class SignalVocabularyEntry(BaseModel): - """One canonical signal + the producer-specific aliases that mean the same thing.""" + """One canonical signal + its aliases + its KIND (the authoritative observation/requirement label).""" id: str + kind: str = "observation" # "observation" (I saw X) | "requirement" (someone DEMANDS X) aliases: List[str] = Field(default_factory=list) @@ -44,18 +51,23 @@ def normalize_signals( ) -> List[IntakeSignal]: """Reduce heterogeneous producer signals to the canonical IntakeSignal stream (alias resolution). - Unknown signal ids pass through unchanged (a new producer's signal stays visible, not silently - dropped). Deterministic; carries confidence/evidence/provenance for the audit trail. + The canonical vocabulary entry's `kind` is AUTHORITATIVE — a producer cannot relabel a requirement as + an observation (that is what stops a demanded SBOM from masquerading as a present one). Unknown signal + ids pass through unchanged (a new producer's signal stays visible, not silently dropped) and keep the + producer-declared kind (default observation). Deterministic; carries confidence/evidence/provenance. """ alias: Dict[str, str] = {} + kind_of: Dict[str, str] = {} for v in vocabulary: alias[v.id] = v.id + kind_of[v.id] = v.kind for a in v.aliases: alias[a] = v.id out: List[IntakeSignal] = [] for p in produced: canonical = alias.get(p.signal_id, p.signal_id) + kind = kind_of.get(canonical) or p.kind or "observation" out.append(IntakeSignal( - source=p.source_type, signal=canonical, confidence=p.confidence, + source=p.source_type, signal=canonical, kind=kind, confidence=p.confidence, evidence=p.evidence, provenance=p.provenance)) return out diff --git a/backend-compliance/compliance/onboarding/silent_intake.py b/backend-compliance/compliance/onboarding/silent_intake.py index 94fe20d3..d5cda1fa 100644 --- a/backend-compliance/compliance/onboarding/silent_intake.py +++ b/backend-compliance/compliance/onboarding/silent_intake.py @@ -24,7 +24,8 @@ class IntakeSignal(BaseModel): from a website, a repo, a PDF, a tender or the user — normalize_signals() unified them (see signals.py).""" source: str # source_type: website / repository / document / product / tender / user - signal: str # CANONICAL signal id, e.g. "sbom_file_found" + signal: str # CANONICAL signal id, e.g. "sbom_present" + kind: str = "observation" # "observation" (I saw X) | "requirement" (someone DEMANDS X) confidence: float = 1.0 # carried from the producer evidence: Optional[str] = None # the artifact already in hand provenance: str = "" # where it came from (url / filename / tender clause) — audit trail @@ -61,10 +62,13 @@ class SilentIntakeResult(BaseModel): detected_capabilities: List[DetectedCapability] = Field(default_factory=list) product_facts: List[ProductFact] = Field(default_factory=list) evidence_found: List[str] = Field(default_factory=list) + requirements_seen: List[str] = Field(default_factory=list) # requirement-kind signals — preserved, NOT present summary: str = "" def capability_ids(self) -> List[str]: - """The detected capability ids — fed into the Advisor as already-present (delta-reducing).""" + """The detected capability ids — fed into the Advisor as already-present (delta-reducing). + + ONLY observation-kind signals reach here (requirements never become a present capability).""" return sorted({d.capability for d in self.detected_capabilities}) @@ -83,7 +87,11 @@ def silent_intake( caps: Dict[str, DetectedCapability] = {} facts: Dict[str, ProductFact] = {} evidence: Set[str] = set() + requirements: Set[str] = set() for s in signals: + if s.kind != "observation": # a requirement describes a TARGET, never the present state + requirements.add(s.signal) # preserved + visible, but NEVER turned into a capability + continue for m in by_signal.get(s.signal, []): if m.capability and m.capability not in caps: caps[m.capability] = DetectedCapability( @@ -97,10 +105,12 @@ def silent_intake( detected = [caps[k] for k in sorted(caps)] product_facts = [facts[k] for k in sorted(facts)] + requirements_seen = sorted(requirements) summary = ( - "Stille Vorbefüllung: %d Fähigkeit(en) automatisch erkannt, %d Produktfakt(en), %d Nachweis(e) bereits vorhanden." - % (len(detected), len(product_facts), len(evidence)) + "Stille Vorbefüllung: %d Fähigkeit(en) automatisch erkannt, %d Produktfakt(en), %d Nachweis(e) " + "bereits vorhanden, %d Anforderung(en) erkannt (nicht als vorhanden gewertet)." + % (len(detected), len(product_facts), len(evidence), len(requirements_seen)) ) return SilentIntakeResult( detected_capabilities=detected, product_facts=product_facts, - evidence_found=sorted(evidence), summary=summary) + evidence_found=sorted(evidence), requirements_seen=requirements_seen, summary=summary) diff --git a/backend-compliance/knowledge/onboarding/intake_signal_map.yaml b/backend-compliance/knowledge/onboarding/intake_signal_map.yaml index 3968a7e7..770adc10 100644 --- a/backend-compliance/knowledge/onboarding/intake_signal_map.yaml +++ b/backend-compliance/knowledge/onboarding/intake_signal_map.yaml @@ -7,13 +7,16 @@ # are UPSTREAM and produce the signals; this file only interprets them. No norm text, no real names. mappings: + # Only OBSERVATION-kind signals appear here. requirement-kind signals (sbom_required, psirt_required, + # signed_updates_required) are intentionally ABSENT — they describe a target, never the present state, + # and the Silent Pass would never consume them anyway (it filters on kind == observation). # ── website ─────────────────────────────────────────────────────────────────────────────── - - {signal: security_txt_or_cvd_policy, capability: coordinated_vulnerability_disclosure, relationship: detected, evidence: cvd_policy} + - {signal: cvd_policy_present, capability: coordinated_vulnerability_disclosure, relationship: detected, evidence: cvd_policy} - {signal: ce_marking_on_site, capability: ce_conformity_assessment_and_technical_documentation, relationship: partial, evidence: ce_declaration} - {signal: support_lifecycle_page, capability: security_update_support_period, relationship: partial, evidence: support_policy} - {signal: security_policy_page, capability: information_security_management, relationship: partial} # ── repository ──────────────────────────────────────────────────────────────────────────── - - {signal: sbom_file_found, capability: sbom_creation, relationship: detected, evidence: sbom} + - {signal: sbom_present, capability: sbom_creation, relationship: detected, evidence: sbom} - {signal: signed_releases, capability: secure_signed_update_distribution, relationship: detected, evidence: signing_config} - {signal: github_actions_ci, capability: secure_development_lifecycle, relationship: partial, evidence: ci_pipeline} - {signal: dependency_scanning, capability: technical_vulnerability_management, relationship: partial, evidence: vuln_scanning_config} diff --git a/backend-compliance/knowledge/onboarding/signal_vocabulary.yaml b/backend-compliance/knowledge/onboarding/signal_vocabulary.yaml index 2fb958f0..c7360e5f 100644 --- a/backend-compliance/knowledge/onboarding/signal_vocabulary.yaml +++ b/backend-compliance/knowledge/onboarding/signal_vocabulary.yaml @@ -1,14 +1,23 @@ -# Signal Vocabulary — canonical signal id + the producer-specific aliases that mean the same thing. +# Signal Vocabulary — canonical signal id + aliases + KIND. One language, but TWO kinds of signal. # # The same fact ("SBOM present") can arrive as CycloneDX, SPDX, a GitHub Action, a Maven plugin, a -# document upload, a customer statement, a tender clause or a repo file. For the Silent Pass they are -# ALL identical: `sbom_file_found`. This file reduces them to one canonical signal — same pattern as the -# regulation-alias vocabulary, MCAPs and Requirement Sources: many inputs, one language. No scanner- -# specific logic ever reaches the Silent Pass. Pure DATA, injected into normalize_signals(). No real names. +# document upload or a customer statement — for the Silent Pass they are ALL `sbom_present`. This file +# reduces producer dialects to one canonical signal — same pattern as the regulation-alias vocabulary, +# MCAPs and Requirement Sources: many inputs, one language. No scanner-specific logic reaches the Silent +# Pass. Pure DATA, injected into normalize_signals(). No real names. +# +# KIND is the load-bearing distinction (default: observation): +# observation = "I SAW X" — a repo with an SBOM, a published security.txt, a risk-assessment PDF. +# requirement = "someone DEMANDS X" — a tender clause `requires_sbom`, an OEM spec `supplier_requires_psirt`. +# A DEMANDED SBOM is NOT a PRESENT SBOM. `kind` lives on the canonical entry (AUTHORITATIVE), so even a +# mislabelled producer signal cannot collapse the two. The Silent Pass consumes ONLY observations; +# requirement signals are preserved (requirements_seen) and drive the required-set / prioritisation later +# (Requirement Source). This is the Observation-vs-Requirement split the Verification Platform rests on. signals: - - {id: sbom_file_found, aliases: [cyclonedx_found, spdx_found, sbom_in_repo, sbom_present, sbom_uploaded, requires_sbom, sbom_in_tender]} - - {id: security_txt_or_cvd_policy, aliases: [security_txt, vdp_found, cvd_policy_pdf, psirt_page, coordinated_disclosure_policy, supplier_requires_psirt]} + # ── OBSERVATIONS — "I saw X" (kind: observation, the default) ──────────────────────────────── + - {id: sbom_present, aliases: [cyclonedx_found, spdx_found, sbom_in_repo, sbom_uploaded]} + - {id: cvd_policy_present, aliases: [security_txt, vdp_found, cvd_policy_pdf, psirt_page, coordinated_disclosure_policy]} - {id: signed_releases, aliases: [signed_artifacts, cosign_found, gpg_signed_releases, code_signing_cert, secure_boot]} - {id: github_actions_ci, aliases: [ci_pipeline, gitlab_ci, jenkins_pipeline, build_automation]} - {id: dependency_scanning, aliases: [dependabot, renovate, snyk_found, trivy_in_ci, sca_tool]} @@ -19,10 +28,17 @@ signals: - {id: product_risk_assessment_doc, aliases: [risk_assessment_pdf, hazard_analysis_doc, tara_doc]} - {id: patch_policy_doc, aliases: [patch_management_policy, update_policy_pdf]} - {id: incident_response_plan_doc, aliases: [irp_doc, incident_playbook]} - # product facts + # product facts (also observations: an observed product property that drives scope) - {id: cloud_connectivity, aliases: [cloud_hosted, saas, internet_facing, connected_product]} - {id: plc_sps, aliases: [plc_detected, sps_steuerung, industrial_controller]} - {id: embedded_software, aliases: [firmware_present, embedded_device]} - {id: wireless_radio, aliases: [bluetooth, wifi_module, radio_equipment, funkmodul]} - {id: remote_access, aliases: [remote_maintenance, vpn_access, teleservice, fernwartung]} - {id: generates_usage_data, aliases: [telemetry_collected, usage_analytics]} + # ── REQUIREMENTS — "someone DEMANDS X" (kind: requirement; NEVER read as present) ───────────── + # Preserved + visible, but the Silent Pass does NOT turn them into detected capabilities. A tender / + # OEM spec / law lands here; a scanner / repo / document lands above. Intentionally UNMAPPED in + # intake_signal_map.yaml — they describe the target, not the present state. + - {id: sbom_required, kind: requirement, aliases: [requires_sbom, sbom_in_tender, tender_requires_sbom]} + - {id: psirt_required, kind: requirement, aliases: [supplier_requires_psirt, requires_psirt, requires_cvd, oem_requires_psirt]} + - {id: signed_updates_required, kind: requirement, aliases: [requires_signed_updates, supplier_requires_signed_updates]} diff --git a/backend-compliance/tests/test_onboarding_endpoint.py b/backend-compliance/tests/test_onboarding_endpoint.py index 9029415a..c50d1f5f 100644 --- a/backend-compliance/tests/test_onboarding_endpoint.py +++ b/backend-compliance/tests/test_onboarding_endpoint.py @@ -47,6 +47,20 @@ def test_advisor_start_returns_full_payload(): assert "sbom_creation" not in {q["capability_id"] for q in d["top_5_questions"]} # detected -> not asked +def test_requirement_signal_does_not_auto_detect_capability(): + # a tender that DEMANDS an SBOM (requirement) must NOT be read as "SBOM present": sbom_creation stays + # open (asked / in the delta), unlike a real cyclonedx_found observation. + body = dict(_BODY, scanner_findings=[ + {"signal_id": "requires_sbom", "source_type": "tender", "provenance": "tender §4.2"}, + ]) + r = _client.post("/onboarding/advisor-start", json=body) + assert r.status_code == 200, r.text + d = r.json() + assert "sbom_creation" not in d["auto_detected"] # demanded != present + asked = {q["capability_id"] for q in d["top_5_questions"]} + assert "sbom_creation" in asked or "sbom_creation" in d["capability_delta"] # still an open gap + + def test_unknown_target_is_404(): body = dict(_BODY, target="NOPE") r = _client.post("/onboarding/advisor-start", json=body) diff --git a/backend-compliance/tests/test_signal_producer.py b/backend-compliance/tests/test_signal_producer.py index 0ded5e1d..0b0808b8 100644 --- a/backend-compliance/tests/test_signal_producer.py +++ b/backend-compliance/tests/test_signal_producer.py @@ -1,9 +1,10 @@ -"""Signal Producer + Normalizer — one signal language for all sources. +"""Signal Producer + Normalizer — one signal language, but TWO signal KINDS. -Pins the abstraction the user asked for: every source emits the same ProducedSignal, and the Normalizer -reduces producer-specific signal ids to ONE canonical signal via a vocabulary. The Silent Pass therefore -cannot tell whether "SBOM present" came from a website, a repo, a PDF, a tender or the user — and gets no -per-scanner logic. +Pins the abstraction: every source emits the same ProducedSignal, and the Normalizer reduces +producer-specific ids to ONE canonical signal via a vocabulary. CRITICAL: an OBSERVATION ("I saw an +SBOM") and a REQUIREMENT ("a tender DEMANDS an SBOM") must NEVER collapse to the same signal — a +demanded SBOM is not a present one. kind is authoritative on the canonical vocabulary entry, and the +Silent Pass consumes only observations. """ from __future__ import annotations @@ -27,24 +28,47 @@ _MAP = [SignalMapping(**m) for m in yaml.safe_load( open(os.path.join(_DIR, "..", "knowledge", "onboarding", "intake_signal_map.yaml"), encoding="utf-8"))["mappings"]] -def test_different_producers_yield_the_same_canonical_signal(): - # the SAME fact, emitted by four totally different producers with different raw ids +def test_observation_producers_yield_one_canonical_signal(): + # the SAME OBSERVATION, emitted by three different producers with different raw ids produced = [ ProducedSignal(signal_id="cyclonedx_found", source_type="repository", provenance="sbom.cdx.json"), ProducedSignal(signal_id="spdx_found", source_type="repository", provenance="sbom.spdx"), ProducedSignal(signal_id="sbom_uploaded", source_type="document", provenance="customer_upload.pdf"), - ProducedSignal(signal_id="requires_sbom", source_type="tender", provenance="tender §4.2"), ] normalized = normalize_signals(produced, _VOCAB) - assert {s.signal for s in normalized} == {"sbom_file_found"} # all reduced to ONE canonical signal - assert {s.source for s in normalized} == {"repository", "document", "tender"} # provenance preserved + assert {s.signal for s in normalized} == {"sbom_present"} # all reduced to ONE canonical observation + assert {s.kind for s in normalized} == {"observation"} # all observations + assert {s.source for s in normalized} == {"repository", "document"} # provenance preserved -def test_silent_pass_consumes_normalized_signals_source_agnostic(): - # a tender that "requires SBOM" produces the same effect as a repo that HAS one - from_repo = normalize_signals([ProducedSignal(signal_id="cyclonedx_found", source_type="repository", evidence="sbom")], _VOCAB) +def test_requirement_and_observation_never_collapse(): + # a tender that DEMANDS an SBOM must NOT become the same signal as a repo that HAS one + normalized = normalize_signals([ + ProducedSignal(signal_id="cyclonedx_found", source_type="repository"), # observation + ProducedSignal(signal_id="requires_sbom", source_type="tender", provenance="tender §4.2"), # requirement + ], _VOCAB) + by_kind = {s.kind: s.signal for s in normalized} + assert by_kind["observation"] == "sbom_present" + assert by_kind["requirement"] == "sbom_required" + assert by_kind["observation"] != by_kind["requirement"] + + +def test_requirement_signal_produces_no_capability(): + # the regression the whole fix is about: a DEMANDED SBOM yields NO detected capability, + # but is preserved as a requirement; a real SBOM in the repo still IS detected. from_tender = normalize_signals([ProducedSignal(signal_id="requires_sbom", source_type="tender")], _VOCAB) - assert silent_intake(from_repo, _MAP).capability_ids() == silent_intake(from_tender, _MAP).capability_ids() == ["sbom_creation"] + res_tender = silent_intake(from_tender, _MAP) + assert res_tender.capability_ids() == [] # NOT read as present + assert res_tender.requirements_seen == ["sbom_required"] # but preserved + visible + + from_repo = normalize_signals([ProducedSignal(signal_id="cyclonedx_found", source_type="repository", evidence="sbom")], _VOCAB) + assert silent_intake(from_repo, _MAP).capability_ids() == ["sbom_creation"] + + +def test_vocabulary_kind_overrides_a_mislabelled_producer(): + # even if a producer wrongly tags a requirement as observation, the vocabulary is authoritative + norm = normalize_signals([ProducedSignal(signal_id="requires_sbom", source_type="tender", kind="observation")], _VOCAB) + assert norm[0].signal == "sbom_required" and norm[0].kind == "requirement" def test_unknown_signal_passes_through_not_dropped(): diff --git a/backend-compliance/tests/test_silent_intake.py b/backend-compliance/tests/test_silent_intake.py index 9d3f98ad..f0139e63 100644 --- a/backend-compliance/tests/test_silent_intake.py +++ b/backend-compliance/tests/test_silent_intake.py @@ -36,8 +36,8 @@ _REQ += [TargetRequirement(capability_id=d["capability"], expected_evidence=d.ge # scanner findings (injected): a machine builder with a public CVD policy, an SBOM + signed releases in # the repo, a product risk-assessment doc, and a cloud-connected PLC product. _SIGNALS = [ - IntakeSignal(source="website", signal="security_txt_or_cvd_policy", detail="/.well-known/security.txt"), - IntakeSignal(source="repository", signal="sbom_file_found", detail="sbom.cdx.json"), + IntakeSignal(source="website", signal="cvd_policy_present", detail="/.well-known/security.txt"), + IntakeSignal(source="repository", signal="sbom_present", detail="sbom.cdx.json"), IntakeSignal(source="repository", signal="signed_releases"), IntakeSignal(source="document", signal="product_risk_assessment_doc"), IntakeSignal(source="product", signal="cloud_connectivity"),