978052b5a2
Fix B of the pre-#59 semantic correction. The Silent Pass had only TWO effective states though the data
carries three: a `detected` mapping (a concrete artifact) AND a `partial` mapping (an indicative signal,
e.g. a CI pipeline -> secure-development-lifecycle) both flowed through capability_ids() and were fed to
the Advisor as already-present — so a weak indication silently removed a question, exactly the Welt-1/
Welt-2 transparency we want to keep.
Now three distinct states:
- detected -> reduces the delta immediately (auto_detected, not asked). [unchanged]
- partial -> raises assumption strength but does NOT replace the question (surfaced as `indications`,
the capability stays in the delta and is still asked).
- requirement-> describes a target, never the present state (already handled by Fix A's kind split).
Changes (data + thin wiring, no new architecture):
- SilentIntakeResult.capability_ids() returns only relationship==detected; new indicative_capability_ids()
returns the partial ones.
- advisor_start() gains indicative_capabilities (NOT fed into the profile) and surfaces result.indications
= indicative ∩ required − auto_detected.
- AdvisorResult / AdvisorResponse gain `indications` (additive, contract-safe); the service passes the
indicative ids through.
Tests: a partial CI signal is indicative-not-detected and does NOT shrink the delta; end-to-end it appears
in `indications`, not `auto_detected`, and the gap is still asked. 28 onboarding tests pass, mypy --strict
clean on the onboarding modules, demo runs, check-loc 0. Runtime effect -> deploy + smoke.
100 lines
5.4 KiB
Python
100 lines
5.4 KiB
Python
"""Silent Knowledge Pass — recognise before asking (Phase 0).
|
|
|
|
Pins the deterministic signal->capability/product-fact mapping and the product effect that matters: when
|
|
the Silent Pass feeds detected capabilities into the Advisor, the delta shrinks and the number of
|
|
next-best questions DROPS — "we already recognised X, only these few remain" instead of a question wall.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
|
|
import yaml
|
|
|
|
from compliance.onboarding import (
|
|
IntakeSignal,
|
|
OnboardingInput,
|
|
SignalMapping,
|
|
advisor_start,
|
|
resolve_for_certifications,
|
|
silent_intake,
|
|
)
|
|
from compliance.onboarding import CapabilityHypothesis
|
|
from compliance.transition_reasoning import TargetRequirement
|
|
|
|
_DIR = os.path.dirname(__file__)
|
|
_MAP = [SignalMapping(**m) for m in yaml.safe_load(
|
|
open(os.path.join(_DIR, "..", "knowledge", "onboarding", "intake_signal_map.yaml"), encoding="utf-8"))["mappings"]]
|
|
_LIB = [CapabilityHypothesis(**h) for h in yaml.safe_load(
|
|
open(os.path.join(_DIR, "..", "knowledge", "certification_hypotheses", "hypotheses.yaml"), encoding="utf-8"))["hypotheses"]]
|
|
_CRA = yaml.safe_load(open(os.path.join(_DIR, "..", "knowledge", "transition_patterns",
|
|
"transition_pattern_iso27001_to_cra_maschinenvo_v1.yaml"), encoding="utf-8"))
|
|
_REQ = [TargetRequirement(capability_id=a["capability"]) for a in _CRA["likely_covered"]]
|
|
_REQ += [TargetRequirement(capability_id=d["capability"], expected_evidence=d.get("expected_evidence", []))
|
|
for d in _CRA["delta_requirements"]]
|
|
|
|
# scanner findings (injected): a machine builder with a public CVD policy, an SBOM + signed releases in
|
|
# the repo, a product risk-assessment doc, and a cloud-connected PLC product.
|
|
_SIGNALS = [
|
|
IntakeSignal(source="website", signal="cvd_policy_present", detail="/.well-known/security.txt"),
|
|
IntakeSignal(source="repository", signal="sbom_present", detail="sbom.cdx.json"),
|
|
IntakeSignal(source="repository", signal="signed_releases"),
|
|
IntakeSignal(source="document", signal="product_risk_assessment_doc"),
|
|
IntakeSignal(source="product", signal="cloud_connectivity"),
|
|
IntakeSignal(source="product", signal="plc_sps"),
|
|
]
|
|
|
|
|
|
def test_silent_intake_is_deterministic_signal_mapping():
|
|
res = silent_intake(_SIGNALS, _MAP)
|
|
caps = set(res.capability_ids())
|
|
assert {"coordinated_vulnerability_disclosure", "sbom_creation", "secure_signed_update_distribution",
|
|
"product_cyber_risk_assessment"} <= caps
|
|
assert "sbom" in res.evidence_found # evidence already in hand -> no upload needed
|
|
facts = {f.key for f in res.product_facts}
|
|
assert "connected_to_internet" in facts and "is_machine" in facts
|
|
|
|
|
|
def test_silent_pass_reduces_the_questions():
|
|
inp = OnboardingInput(company="x", certifications=["ISO27001", "ISO9001"], target=["CRA"])
|
|
hyp = resolve_for_certifications(inp.certifications, _LIB)
|
|
without = advisor_start(inp, hyp, _REQ, target_id="CRA", corpus_status={"CRA": "validated"})
|
|
detected = silent_intake(_SIGNALS, _MAP).capability_ids()
|
|
with_pass = advisor_start(inp, hyp, _REQ, target_id="CRA", corpus_status={"CRA": "validated"},
|
|
detected_capabilities=detected)
|
|
# the whole point: recognising things automatically leaves FEWER open questions
|
|
assert len(with_pass.capability_delta) < len(without.capability_delta)
|
|
assert len(with_pass.next_best_questions) <= len(without.next_best_questions)
|
|
assert with_pass.auto_detected # recognised without asking
|
|
assert "automatisch erkannt (Intake)" in with_pass.headline
|
|
|
|
|
|
def test_detected_capabilities_are_not_asked_again():
|
|
inp = OnboardingInput(company="x", certifications=["ISO27001"], target=["CRA"])
|
|
hyp = resolve_for_certifications(inp.certifications, _LIB)
|
|
detected = silent_intake(_SIGNALS, _MAP).capability_ids()
|
|
res = advisor_start(inp, hyp, _REQ, target_id="CRA", corpus_status={"CRA": "validated"},
|
|
detected_capabilities=detected)
|
|
asked = {q.capability_id for q in res.next_best_questions}
|
|
assert "sbom_creation" not in asked and "sbom_creation" not in res.capability_delta
|
|
|
|
|
|
def test_partial_signal_is_indicative_not_detected():
|
|
# a PARTIAL signal (CI present -> secure dev lifecycle) raises assumption strength but is NOT a
|
|
# detected capability: it must NOT shrink the delta the way a concrete artifact does.
|
|
res = silent_intake([IntakeSignal(source="repository", signal="github_actions_ci")], _MAP)
|
|
assert "secure_development_lifecycle" not in res.capability_ids() # not counted as present
|
|
assert res.indicative_capability_ids() == ["secure_development_lifecycle"] # surfaced as an indication
|
|
|
|
|
|
def test_partial_indication_does_not_remove_the_question():
|
|
inp = OnboardingInput(company="x", certifications=["ISO27001"], target=["CRA"])
|
|
hyp = resolve_for_certifications(inp.certifications, _LIB)
|
|
si = silent_intake([IntakeSignal(source="repository", signal="github_actions_ci")], _MAP)
|
|
res = advisor_start(inp, hyp, _REQ, target_id="CRA", corpus_status={"CRA": "validated"},
|
|
detected_capabilities=si.capability_ids(),
|
|
indicative_capabilities=si.indicative_capability_ids())
|
|
assert "secure_development_lifecycle" not in res.auto_detected # partial != detected
|
|
assert "secure_development_lifecycle" in res.indications # strength shown
|
|
assert "secure_development_lifecycle" in res.capability_delta # gap still open / asked
|