c39787ad96
Semantic correction of the knowledge base BEFORE the empirical loop (#59) is built — otherwise the Observation Store would learn from already-misclassified signals. The Silent Pass conflated two kinds of signal into one: an OBSERVATION ("I saw an SBOM in the repo") and a REQUIREMENT ("a tender DEMANDS an SBOM"). They were aliased to the same canonical id, so a tender clause read as "SBOM already present" and suppressed the very question that should have been asked. Fix — make the kind explicit and authoritative (no new architecture, data + thin wiring): - `kind` ∈ {observation, requirement} on ProducedSignal (producer may declare) and on the canonical SignalVocabularyEntry (AUTHORITATIVE — a mislabelled producer cannot collapse the two). - Vocabulary split: sbom_file_found → sbom_present (obs) + sbom_required (req); security_txt_or_cvd_policy → cvd_policy_present (obs) + psirt_required (req); add signed_updates_required. requirement signals are intentionally UNMAPPED in intake_signal_map (they describe a target, not state). - silent_intake() consumes ONLY kind==observation; requirement signals are preserved in `requirements_seen` (visible/auditable) but NEVER become a detected capability. - normalize_signals() stamps the vocabulary's kind onto every IntakeSignal; unknown ids still pass through. This is the same Observation-vs-Requirement split the Requirements Verification Platform rests on: observations are reality, requirements are targets, and their comparison is the delta. A tender / OEM spec / law now produces requirement signals; scanners / repos / documents produce observation signals. Tests: rewrote the two test_signal_producer cases that previously ASSERTED the bug (tender == repo) to pin the correct split; regression — `requires_sbom` yields no capability + stays in requirements_seen while `cyclonedx_found` still detects sbom_creation; endpoint-level regression that a tender requirement does not auto-detect and the gap stays asked; vocabulary-kind-overrides-mislabelled-producer. 25 onboarding tests pass, mypy --strict clean, demo runs, check-loc 0. Runtime effect → deploy + smoke. (Fix A; partial-vs- detected decoupling follows as Fix B before #59.)
80 lines
4.1 KiB
Python
80 lines
4.1 KiB
Python
"""Silent Knowledge Pass — recognise before asking (Phase 0).
|
|
|
|
Pins the deterministic signal->capability/product-fact mapping and the product effect that matters: when
|
|
the Silent Pass feeds detected capabilities into the Advisor, the delta shrinks and the number of
|
|
next-best questions DROPS — "we already recognised X, only these few remain" instead of a question wall.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
|
|
import yaml
|
|
|
|
from compliance.onboarding import (
|
|
IntakeSignal,
|
|
OnboardingInput,
|
|
SignalMapping,
|
|
advisor_start,
|
|
resolve_for_certifications,
|
|
silent_intake,
|
|
)
|
|
from compliance.onboarding import CapabilityHypothesis
|
|
from compliance.transition_reasoning import TargetRequirement
|
|
|
|
_DIR = os.path.dirname(__file__)
|
|
_MAP = [SignalMapping(**m) for m in yaml.safe_load(
|
|
open(os.path.join(_DIR, "..", "knowledge", "onboarding", "intake_signal_map.yaml"), encoding="utf-8"))["mappings"]]
|
|
_LIB = [CapabilityHypothesis(**h) for h in yaml.safe_load(
|
|
open(os.path.join(_DIR, "..", "knowledge", "certification_hypotheses", "hypotheses.yaml"), encoding="utf-8"))["hypotheses"]]
|
|
_CRA = yaml.safe_load(open(os.path.join(_DIR, "..", "knowledge", "transition_patterns",
|
|
"transition_pattern_iso27001_to_cra_maschinenvo_v1.yaml"), encoding="utf-8"))
|
|
_REQ = [TargetRequirement(capability_id=a["capability"]) for a in _CRA["likely_covered"]]
|
|
_REQ += [TargetRequirement(capability_id=d["capability"], expected_evidence=d.get("expected_evidence", []))
|
|
for d in _CRA["delta_requirements"]]
|
|
|
|
# scanner findings (injected): a machine builder with a public CVD policy, an SBOM + signed releases in
|
|
# the repo, a product risk-assessment doc, and a cloud-connected PLC product.
|
|
_SIGNALS = [
|
|
IntakeSignal(source="website", signal="cvd_policy_present", detail="/.well-known/security.txt"),
|
|
IntakeSignal(source="repository", signal="sbom_present", detail="sbom.cdx.json"),
|
|
IntakeSignal(source="repository", signal="signed_releases"),
|
|
IntakeSignal(source="document", signal="product_risk_assessment_doc"),
|
|
IntakeSignal(source="product", signal="cloud_connectivity"),
|
|
IntakeSignal(source="product", signal="plc_sps"),
|
|
]
|
|
|
|
|
|
def test_silent_intake_is_deterministic_signal_mapping():
|
|
res = silent_intake(_SIGNALS, _MAP)
|
|
caps = set(res.capability_ids())
|
|
assert {"coordinated_vulnerability_disclosure", "sbom_creation", "secure_signed_update_distribution",
|
|
"product_cyber_risk_assessment"} <= caps
|
|
assert "sbom" in res.evidence_found # evidence already in hand -> no upload needed
|
|
facts = {f.key for f in res.product_facts}
|
|
assert "connected_to_internet" in facts and "is_machine" in facts
|
|
|
|
|
|
def test_silent_pass_reduces_the_questions():
|
|
inp = OnboardingInput(company="x", certifications=["ISO27001", "ISO9001"], target=["CRA"])
|
|
hyp = resolve_for_certifications(inp.certifications, _LIB)
|
|
without = advisor_start(inp, hyp, _REQ, target_id="CRA", corpus_status={"CRA": "validated"})
|
|
detected = silent_intake(_SIGNALS, _MAP).capability_ids()
|
|
with_pass = advisor_start(inp, hyp, _REQ, target_id="CRA", corpus_status={"CRA": "validated"},
|
|
detected_capabilities=detected)
|
|
# the whole point: recognising things automatically leaves FEWER open questions
|
|
assert len(with_pass.capability_delta) < len(without.capability_delta)
|
|
assert len(with_pass.next_best_questions) <= len(without.next_best_questions)
|
|
assert with_pass.auto_detected # recognised without asking
|
|
assert "automatisch erkannt (Intake)" in with_pass.headline
|
|
|
|
|
|
def test_detected_capabilities_are_not_asked_again():
|
|
inp = OnboardingInput(company="x", certifications=["ISO27001"], target=["CRA"])
|
|
hyp = resolve_for_certifications(inp.certifications, _LIB)
|
|
detected = silent_intake(_SIGNALS, _MAP).capability_ids()
|
|
res = advisor_start(inp, hyp, _REQ, target_id="CRA", corpus_status={"CRA": "validated"},
|
|
detected_capabilities=detected)
|
|
asked = {q.capability_id for q in res.next_best_questions}
|
|
assert "sbom_creation" not in asked and "sbom_creation" not in res.capability_delta
|