From 9c3358241269a1bfc603c8bb395d7a3220e204fd Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Sun, 28 Jun 2026 14:34:27 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20Silent=20Knowledge=20Pass=20=E2=80=94?= =?UTF-8?q?=20recognise=20before=20asking=20(Phase=200,=20before=20the=20e?= =?UTF-8?q?ndpoint)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Not the endpoint yet — the bigger knowledge lever first. The Advisor can say "I need 5 answers" but does not yet decide what it can find out by ITSELF. The Silent Knowledge Pass runs in front of the Advisor and, from signals existing scanners/parsers already produce (website, repository, documents, product data), deterministically derives capabilities the company demonstrably HAS + product facts that drive scope — so every recognised item shrinks the delta and removes a question. compliance/onboarding/silent_intake.py: silent_intake(signals, signal_map) -> detected_capabilities (+ evidence already in hand) + product_facts. The signal->conclusion map is curated DATA (knowledge/onboarding/intake_signal_map.yaml), signals are injected (scanners are upstream). Pure, deterministic, no LLM. advisor_start gains detected_capabilities (folded into the profile at HIGH confidence -> covered, not asked) and an auto_detected result + headline. The experience flips from a question wall to "we already recognised 4 capabilities, 2 product facts and have 4 pieces of evidence in hand — only these few remain". Order now: Silent Pass -> #58 endpoint/frontend -> #59 empirical loop. NOT new architecture, just an orchestration step in front. Non-runtime (no app caller) -> no deploy. 15 onboarding tests pass, mypy --strict clean, check-loc 0. --- .../compliance/onboarding/__init__.py | 14 +++ .../compliance/onboarding/engine.py | 38 ++++--- .../compliance/onboarding/schemas.py | 1 + .../compliance/onboarding/silent_intake.py | 99 +++++++++++++++++++ .../onboarding/intake_signal_map.yaml | 31 ++++++ .../onboarding_advisor_demo.md | 34 ++++--- .../onboarding_advisor_demo.py | 24 ++++- .../tests/test_silent_intake.py | 79 +++++++++++++++ 8 files changed, 290 insertions(+), 30 deletions(-) create mode 100644 backend-compliance/compliance/onboarding/silent_intake.py create mode 100644 backend-compliance/knowledge/onboarding/intake_signal_map.yaml create mode 100644 backend-compliance/tests/test_silent_intake.py diff --git a/backend-compliance/compliance/onboarding/__init__.py b/backend-compliance/compliance/onboarding/__init__.py index 2a6f5447..77b9bd3f 100644 --- a/backend-compliance/compliance/onboarding/__init__.py +++ b/backend-compliance/compliance/onboarding/__init__.py @@ -21,6 +21,14 @@ from .observations import ( empirical_distribution, reviewed, ) +from .silent_intake import ( + DetectedCapability, + IntakeSignal, + ProductFact, + SignalMapping, + SilentIntakeResult, + silent_intake, +) from .schemas import ( AdvisorMeasure, AdvisorQuestion, @@ -47,4 +55,10 @@ __all__ = [ "empirical_distribution", "empirical_confidence", "reviewed", + "silent_intake", + "IntakeSignal", + "SignalMapping", + "DetectedCapability", + "ProductFact", + "SilentIntakeResult", ] diff --git a/backend-compliance/compliance/onboarding/engine.py b/backend-compliance/compliance/onboarding/engine.py index 25eeb6b3..3af85f0e 100644 --- a/backend-compliance/compliance/onboarding/engine.py +++ b/backend-compliance/compliance/onboarding/engine.py @@ -49,15 +49,21 @@ _GAIN = {"high": 3, "medium": 2, "low": 1} _RISK = {"high": 2, "medium": 1, "low": 0} -def _profile(inp: OnboardingInput, cert_hypotheses: Dict[str, List[str]]) -> CompanyCapabilityProfile: +def _profile( + inp: OnboardingInput, cert_hypotheses: Dict[str, List[str]], + detected: Optional[Sequence[str]] = None, +) -> CompanyCapabilityProfile: cmap = { cert: CapabilityMappingEntry(capability_ids=list(caps), confidence=Confidence.MEDIUM) for cert, caps in cert_hypotheses.items() if cert in inp.certifications and caps } - ctx = CompanyContext(company_id=inp.company or "company", - certifications=[Certification(certification_id=c) for c in cmap]) - return build_company_profile(ctx, cmap) + certs = [Certification(certification_id=c) for c in cmap] + if detected: # Silent Pass: concrete findings -> HIGH confidence + cmap["__detected__"] = CapabilityMappingEntry( + capability_ids=list(dict.fromkeys(detected)), confidence=Confidence.HIGH) + certs.append(Certification(certification_id="__detected__")) + return build_company_profile(CompanyContext(company_id=inp.company or "company", certifications=certs), cmap) def advisor_start( @@ -68,15 +74,18 @@ def advisor_start( covers_targets: Optional[Dict[str, List[str]]] = None, corpus_status: Optional[Dict[str, str]] = None, uncertain: Optional[List[Dict[str, str]]] = None, + detected_capabilities: Optional[Sequence[str]] = None, ) -> AdvisorResult: - """Run the onboarding flow: certs -> profile -> delta -> ranked next-best questions + measures. + """Run the onboarding flow: (silent intake +) certs -> profile -> delta -> ranked questions + measures. - Pure orchestration; deterministic. `cert_hypotheses` (cert -> probable cap ids) and - `target_requirements` are INJECTED. `covers_targets` (cap -> targets it closes) drives leverage. + Pure orchestration; deterministic. `cert_hypotheses` (cert -> probable cap ids), `target_requirements` + and `detected_capabilities` (from the Silent Knowledge Pass) are INJECTED. Detected capabilities are + recognised WITHOUT asking -> they shrink the delta and remove questions. """ covers_targets = covers_targets or {} required = {r.capability_id for r in target_requirements} - profile = _profile(inp, cert_hypotheses) + profile = _profile(inp, cert_hypotheses, detected_capabilities) + auto_detected = sorted(set(detected_capabilities or []) & required) assess = assess_transition( TransitionContext(company_id=inp.company or "company", target=TransitionGoal(target_id=target_id)), list(target_requirements), profile) @@ -123,13 +132,14 @@ def advisor_start( rep = assess_completeness(applicable, corpus_status or {}, uncertain=uncertain or []) unsupported = [e.subject for e in rep.exclusions] - probably = assess.summary.probably_covered + probably = [c for c in assess.summary.probably_covered if c not in set(auto_detected)] return AdvisorResult( - inferred_assumptions=inferred, rejected_assumptions=rejected, next_best_questions=next_q, - capability_delta=delta, top_measures=measures, evidence_requests=evidence, - unsupported_domains=unsupported, completeness_summary=rep.completeness_summary, - headline="%d Anforderungen erkannt · %d wahrscheinlich abgedeckt · %d zu klären" - % (len(assess.coverage), len(probably), len(next_q))) + inferred_assumptions=inferred, rejected_assumptions=rejected, auto_detected=auto_detected, + next_best_questions=next_q, capability_delta=delta, top_measures=measures, + evidence_requests=evidence, unsupported_domains=unsupported, + completeness_summary=rep.completeness_summary, + headline="%d Anforderungen erkannt · %d automatisch erkannt (Intake) · %d wahrscheinlich (Zertifikate) · %d zu klären" + % (len(assess.coverage), len(auto_detected), len(probably), len(next_q))) def apply_answer(known_capabilities: Sequence[str], capability_id: str, answer: str) -> List[str]: diff --git a/backend-compliance/compliance/onboarding/schemas.py b/backend-compliance/compliance/onboarding/schemas.py index ff1724ea..0cf23545 100644 --- a/backend-compliance/compliance/onboarding/schemas.py +++ b/backend-compliance/compliance/onboarding/schemas.py @@ -53,6 +53,7 @@ class AdvisorMeasure(BaseModel): class AdvisorResult(BaseModel): inferred_assumptions: List[InferredAssumption] = Field(default_factory=list) rejected_assumptions: List[RejectedAssumption] = Field(default_factory=list) + auto_detected: List[str] = Field(default_factory=list) # Silent Pass: recognised w/o asking next_best_questions: List[AdvisorQuestion] = Field(default_factory=list) # max 5 capability_delta: List[str] = Field(default_factory=list) top_measures: List[AdvisorMeasure] = Field(default_factory=list) diff --git a/backend-compliance/compliance/onboarding/silent_intake.py b/backend-compliance/compliance/onboarding/silent_intake.py new file mode 100644 index 00000000..97aba5a0 --- /dev/null +++ b/backend-compliance/compliance/onboarding/silent_intake.py @@ -0,0 +1,99 @@ +"""Silent Knowledge Pass — recognise everything possible BEFORE asking a single question (Phase 0). + +The Advisor can say "I need 5 answers" but does not yet decide WHAT it can find out by itself. The Silent +Pass runs first: from signals that existing scanners/parsers already produce (website, repository, +documents, product data) it deterministically derives capabilities the company demonstrably HAS and +product facts that drive scope — so every recognised item shrinks the delta and removes a question. + +The customer then experiences "we already recognised 11 of 17 — only these 4 remain" instead of a +question wall. This is NOT new architecture: it is one orchestration step in front of the Advisor + Company -> Silent Intake -> Company Profile -> Hypotheses -> Delta -> Top Questions +All building blocks already exist. SIGNALS are INJECTED (the scanners produce them); the signal->capability +map is curated DATA, also injected. Pure, deterministic, no I/O. Python 3.9 compatible. +""" + +from __future__ import annotations + +from typing import Dict, List, Optional, Sequence, Set + +from pydantic import BaseModel, Field + + +class IntakeSignal(BaseModel): + """One finding a scanner/parser produced (no LLM here — the scanners are upstream).""" + + source: str # website / repository / document / product + signal: str # signal id, e.g. "sbom_file_found" + detail: str = "" # optional (url, filename) for the audit trail + + +class SignalMapping(BaseModel): + """Curated: what a signal lets us conclude. A signal yields a capability OR a product fact.""" + + signal: str + capability: Optional[str] = None # capability the signal evidences + relationship: str = "detected" # detected (concrete artifact) / partial (indicative) + evidence: Optional[str] = None # the artifact found (already in hand -> no upload needed) + product_fact: Optional[str] = None # e.g. "connected_to_internet" + fact_value: str = "true" + + +class DetectedCapability(BaseModel): + capability: str + relationship: str = "detected" + source: str = "" # which signal/source detected it (audit trail) + evidence: Optional[str] = None + + +class ProductFact(BaseModel): + key: str + value: str = "true" + source: str = "" + + +class SilentIntakeResult(BaseModel): + detected_capabilities: List[DetectedCapability] = Field(default_factory=list) + product_facts: List[ProductFact] = Field(default_factory=list) + evidence_found: List[str] = Field(default_factory=list) + summary: str = "" + + def capability_ids(self) -> List[str]: + """The detected capability ids — fed into the Advisor as already-present (delta-reducing).""" + return sorted({d.capability for d in self.detected_capabilities}) + + +def silent_intake( + signals: Sequence[IntakeSignal], signal_map: Sequence[SignalMapping] +) -> SilentIntakeResult: + """Derive capabilities + product facts from injected scanner signals (deterministic, no questions). + + Each signal is matched to curated mappings by `signal` id; a mapping contributes either a detected + capability (+ optional evidence already in hand) or a product fact. Deduped, deterministic order. + """ + by_signal: Dict[str, List[SignalMapping]] = {} + for m in signal_map: + by_signal.setdefault(m.signal, []).append(m) + + caps: Dict[str, DetectedCapability] = {} + facts: Dict[str, ProductFact] = {} + evidence: Set[str] = set() + for s in signals: + for m in by_signal.get(s.signal, []): + if m.capability and m.capability not in caps: + caps[m.capability] = DetectedCapability( + capability=m.capability, relationship=m.relationship, + source="%s:%s" % (s.source, s.signal), evidence=m.evidence) + if m.evidence: + evidence.add(m.evidence) + if m.product_fact: + facts[m.product_fact] = ProductFact(key=m.product_fact, value=m.fact_value, source=s.source) + + detected = [caps[k] for k in sorted(caps)] + product_facts = [facts[k] for k in sorted(facts)] + summary = ( + "Stille Vorbefüllung: %d Fähigkeit(en) automatisch erkannt, %d Produktfakt(en), %d Nachweis(e) bereits vorhanden." + % (len(detected), len(product_facts), len(evidence)) + ) + return SilentIntakeResult( + detected_capabilities=detected, product_facts=product_facts, + evidence_found=sorted(evidence), summary=summary) diff --git a/backend-compliance/knowledge/onboarding/intake_signal_map.yaml b/backend-compliance/knowledge/onboarding/intake_signal_map.yaml new file mode 100644 index 00000000..3968a7e7 --- /dev/null +++ b/backend-compliance/knowledge/onboarding/intake_signal_map.yaml @@ -0,0 +1,31 @@ +# Silent Knowledge Pass — signal -> conclusion map (curated DATA, injected). +# +# What a scanner finding lets us conclude WITHOUT asking the user. A signal yields either a capability +# the company demonstrably has (with the evidence already in hand) or a product fact that drives scope. +# `relationship: detected` = a concrete artifact (strong, no question); `partial` = indicative (still +# verify, but lower priority). The scanners (website crawler, repo scanner, doc parser, product intake) +# are UPSTREAM and produce the signals; this file only interprets them. No norm text, no real names. + +mappings: + # ── website ─────────────────────────────────────────────────────────────────────────────── + - {signal: security_txt_or_cvd_policy, capability: coordinated_vulnerability_disclosure, relationship: detected, evidence: cvd_policy} + - {signal: ce_marking_on_site, capability: ce_conformity_assessment_and_technical_documentation, relationship: partial, evidence: ce_declaration} + - {signal: support_lifecycle_page, capability: security_update_support_period, relationship: partial, evidence: support_policy} + - {signal: security_policy_page, capability: information_security_management, relationship: partial} + # ── repository ──────────────────────────────────────────────────────────────────────────── + - {signal: sbom_file_found, capability: sbom_creation, relationship: detected, evidence: sbom} + - {signal: signed_releases, capability: secure_signed_update_distribution, relationship: detected, evidence: signing_config} + - {signal: github_actions_ci, capability: secure_development_lifecycle, relationship: partial, evidence: ci_pipeline} + - {signal: dependency_scanning, capability: technical_vulnerability_management, relationship: partial, evidence: vuln_scanning_config} + # ── documents ───────────────────────────────────────────────────────────────────────────── + - {signal: ce_conformity_doc, capability: ce_conformity_assessment_and_technical_documentation, relationship: detected, evidence: technical_documentation} + - {signal: product_risk_assessment_doc, capability: product_cyber_risk_assessment, relationship: detected, evidence: product_risk_assessment} + - {signal: patch_policy_doc, capability: secure_signed_update_distribution, relationship: partial, evidence: patch_policy} + - {signal: incident_response_plan_doc, capability: incident_management, relationship: detected, evidence: incident_procedure} + # ── product facts (drive scope / target applicability) ────────────────────────────────────── + - {signal: cloud_connectivity, product_fact: connected_to_internet} + - {signal: plc_sps, product_fact: is_machine} + - {signal: embedded_software, product_fact: has_embedded_software} + - {signal: wireless_radio, product_fact: has_radio_equipment} + - {signal: remote_access, product_fact: has_remote_access} + - {signal: generates_usage_data, product_fact: generates_usage_data} diff --git a/backend-compliance/reference_scenarios/onboarding_advisor_demo.md b/backend-compliance/reference_scenarios/onboarding_advisor_demo.md index 336f68f0..d62f7c31 100644 --- a/backend-compliance/reference_scenarios/onboarding_advisor_demo.md +++ b/backend-compliance/reference_scenarios/onboarding_advisor_demo.md @@ -5,8 +5,14 @@ _Eingabe: Unternehmen + Produkte + Zertifizierungen + Ziel. Den Rest macht die O ## Eingabe > Zertifizierungen: **ISO9001, ISO27001, ISO14001, TISAX** · Produkt: **Parkschein-/Schrankensystem** · Ziel: **CRA** +## Phase 0 — Stille Vorbefüllung (BEVOR eine Frage erscheint) +> Stille Vorbefüllung: 4 Fähigkeit(en) automatisch erkannt, 2 Produktfakt(en), 4 Nachweis(e) bereits vorhanden. +- **Automatisch erkannte Fähigkeiten:** `coordinated_vulnerability_disclosure`, `product_cyber_risk_assessment`, `sbom_creation`, `secure_signed_update_distribution` +- **Produktfakten (steuern den Scope):** `connected_to_internet=true`, `is_machine=true` +- **Nachweise bereits in der Hand (kein Upload nötig):** cvd_policy, product_risk_assessment, sbom, signing_config + ## Was wir erkannt haben -> 17 Anforderungen erkannt · 5 wahrscheinlich abgedeckt · 5 zu klären +> 17 Anforderungen erkannt · 4 automatisch erkannt (Intake) · 5 wahrscheinlich (Zertifikate) · 5 zu klären **Aus Ihren Zertifizierungen abgeleitet (zu bestätigen, nicht automatisch erfüllt):** - ISO9001 legt 1 relevante Fähigkeit(en) nahe — Verifikation erforderlich, nicht automatisch erfüllt @@ -16,26 +22,26 @@ _Eingabe: Unternehmen + Produkte + Zertifizierungen + Ziel. Den Rest macht die O ## Die wenigen offenen Punkte — nur die nächsten besten Fragen **Frage 1 von 5** _(Informationswert 8)_ -> product cyber risk assessment? — _Warum fragen wir das: Keine Anhaltspunkte im Unternehmensprofil — klären._ - -**Frage 2 von 5** _(Informationswert 8)_ > protection against corruption of safety functions? — _Warum fragen wir das: Keine Anhaltspunkte im Unternehmensprofil — klären._ -**Frage 3 von 5** _(Informationswert 8)_ -> secure signed update distribution? — _Warum fragen wir das: Keine Anhaltspunkte im Unternehmensprofil — klären._ - -**Frage 4 von 5** _(Informationswert 7)_ -> coordinated vulnerability disclosure? — _Warum fragen wir das: Keine Anhaltspunkte im Unternehmensprofil — klären._ - -**Frage 5 von 5** _(Informationswert 7)_ +**Frage 2 von 5** _(Informationswert 7)_ > exploited vuln and incident reporting? — _Warum fragen wir das: Keine Anhaltspunkte im Unternehmensprofil — klären._ +**Frage 3 von 5** _(Informationswert 7)_ +> machine safety risk assessment? — _Warum fragen wir das: Keine Anhaltspunkte im Unternehmensprofil — klären._ + +**Frage 4 von 5** _(Informationswert 7)_ +> mechanical safety and guards? — _Warum fragen wir das: Keine Anhaltspunkte im Unternehmensprofil — klären._ + +**Frage 5 von 5** _(Informationswert 7)_ +> operating instructions and safety information? — _Warum fragen wir das: Keine Anhaltspunkte im Unternehmensprofil — klären._ + ## Womit zuerst anfangen (größter Hebel) -- `product_cyber_risk_assessment` — schließt 2 Anforderung(en): CRA, MaschinenVO - `protection_against_corruption_of_safety_functions` — schließt 2 Anforderung(en): CRA, MaschinenVO -- `secure_signed_update_distribution` — schließt 2 Anforderung(en): CRA, MaschinenVO -- `coordinated_vulnerability_disclosure` — schließt 1 Anforderung(en): CRA - `exploited_vuln_and_incident_reporting` — schließt 1 Anforderung(en): CRA +- `machine_safety_risk_assessment` — schließt 1 Anforderung(en): MaschinenVO +- `mechanical_safety_and_guards` — schließt 1 Anforderung(en): MaschinenVO +- `operating_instructions_and_safety_information` — schließt 1 Anforderung(en): MaschinenVO ## Vollständigkeit (ehrlich) > Identifiziert 1 · bewertet 1 · offen 0 · Unsicherheiten 0 · Begründung ja diff --git a/backend-compliance/reference_scenarios/onboarding_advisor_demo.py b/backend-compliance/reference_scenarios/onboarding_advisor_demo.py index dd4056a0..3ef3e288 100644 --- a/backend-compliance/reference_scenarios/onboarding_advisor_demo.py +++ b/backend-compliance/reference_scenarios/onboarding_advisor_demo.py @@ -12,7 +12,10 @@ from __future__ import annotations import os import yaml -from compliance.onboarding import CapabilityHypothesis, OnboardingInput, advisor_start, resolve_for_certifications +from compliance.onboarding import ( + CapabilityHypothesis, IntakeSignal, OnboardingInput, SignalMapping, + advisor_start, resolve_for_certifications, silent_intake, +) from compliance.transition_reasoning import TargetRequirement OUT = [] @@ -37,7 +40,18 @@ inp = OnboardingInput(company="synthetisch", industry="machine_builder", certifications=["ISO9001", "ISO27001", "ISO14001", "TISAX"], known_evidence=["CE process"], target=["CRA"]) hyp = resolve_for_certifications(inp.certifications, _lib) -res = advisor_start(inp, hyp, req, target_id="CRA", covers_targets=covers, corpus_status={"CRA": "validated"}) +# Phase 0 — Silent Knowledge Pass: recognise everything possible from scanner signals BEFORE asking. +_smap = [SignalMapping(**m) for m in yaml.safe_load( + open(os.path.join(os.path.dirname(__file__), "..", "knowledge", "onboarding", "intake_signal_map.yaml"), encoding="utf-8"))["mappings"]] +_signals = [IntakeSignal(source="website", signal="security_txt_or_cvd_policy", detail="/.well-known/security.txt"), + IntakeSignal(source="repository", signal="sbom_file_found", detail="sbom.cdx.json"), + IntakeSignal(source="repository", signal="signed_releases"), + IntakeSignal(source="document", signal="product_risk_assessment_doc"), + IntakeSignal(source="product", signal="cloud_connectivity"), + IntakeSignal(source="product", signal="plc_sps")] +si = silent_intake(_signals, _smap) +res = advisor_start(inp, hyp, req, target_id="CRA", covers_targets=covers, corpus_status={"CRA": "validated"}, + detected_capabilities=si.capability_ids()) w("# Smart Onboarding Advisor — was der Nutzer sieht (automatisch, ohne Vertrieb)") w("") @@ -46,6 +60,12 @@ w("") w("## Eingabe") w("> Zertifizierungen: **%s** · Produkt: **%s** · Ziel: **%s**" % (", ".join(inp.certifications), inp.products[0], ", ".join(inp.target))) w("") +w("## Phase 0 — Stille Vorbefüllung (BEVOR eine Frage erscheint)") +w("> %s" % si.summary) +w("- **Automatisch erkannte Fähigkeiten:** %s" % ", ".join("`%s`" % d.capability for d in si.detected_capabilities)) +w("- **Produktfakten (steuern den Scope):** %s" % ", ".join("`%s=%s`" % (f.key, f.value) for f in si.product_facts)) +w("- **Nachweise bereits in der Hand (kein Upload nötig):** %s" % ", ".join(si.evidence_found)) +w("") w("## Was wir erkannt haben") w("> %s" % res.headline) w("") diff --git a/backend-compliance/tests/test_silent_intake.py b/backend-compliance/tests/test_silent_intake.py new file mode 100644 index 00000000..9d3f98ad --- /dev/null +++ b/backend-compliance/tests/test_silent_intake.py @@ -0,0 +1,79 @@ +"""Silent Knowledge Pass — recognise before asking (Phase 0). + +Pins the deterministic signal->capability/product-fact mapping and the product effect that matters: when +the Silent Pass feeds detected capabilities into the Advisor, the delta shrinks and the number of +next-best questions DROPS — "we already recognised X, only these few remain" instead of a question wall. +""" + +from __future__ import annotations + +import os + +import yaml + +from compliance.onboarding import ( + IntakeSignal, + OnboardingInput, + SignalMapping, + advisor_start, + resolve_for_certifications, + silent_intake, +) +from compliance.onboarding import CapabilityHypothesis +from compliance.transition_reasoning import TargetRequirement + +_DIR = os.path.dirname(__file__) +_MAP = [SignalMapping(**m) for m in yaml.safe_load( + open(os.path.join(_DIR, "..", "knowledge", "onboarding", "intake_signal_map.yaml"), encoding="utf-8"))["mappings"]] +_LIB = [CapabilityHypothesis(**h) for h in yaml.safe_load( + open(os.path.join(_DIR, "..", "knowledge", "certification_hypotheses", "hypotheses.yaml"), encoding="utf-8"))["hypotheses"]] +_CRA = yaml.safe_load(open(os.path.join(_DIR, "..", "knowledge", "transition_patterns", + "transition_pattern_iso27001_to_cra_maschinenvo_v1.yaml"), encoding="utf-8")) +_REQ = [TargetRequirement(capability_id=a["capability"]) for a in _CRA["likely_covered"]] +_REQ += [TargetRequirement(capability_id=d["capability"], expected_evidence=d.get("expected_evidence", [])) + for d in _CRA["delta_requirements"]] + +# scanner findings (injected): a machine builder with a public CVD policy, an SBOM + signed releases in +# the repo, a product risk-assessment doc, and a cloud-connected PLC product. +_SIGNALS = [ + IntakeSignal(source="website", signal="security_txt_or_cvd_policy", detail="/.well-known/security.txt"), + IntakeSignal(source="repository", signal="sbom_file_found", detail="sbom.cdx.json"), + IntakeSignal(source="repository", signal="signed_releases"), + IntakeSignal(source="document", signal="product_risk_assessment_doc"), + IntakeSignal(source="product", signal="cloud_connectivity"), + IntakeSignal(source="product", signal="plc_sps"), +] + + +def test_silent_intake_is_deterministic_signal_mapping(): + res = silent_intake(_SIGNALS, _MAP) + caps = set(res.capability_ids()) + assert {"coordinated_vulnerability_disclosure", "sbom_creation", "secure_signed_update_distribution", + "product_cyber_risk_assessment"} <= caps + assert "sbom" in res.evidence_found # evidence already in hand -> no upload needed + facts = {f.key for f in res.product_facts} + assert "connected_to_internet" in facts and "is_machine" in facts + + +def test_silent_pass_reduces_the_questions(): + inp = OnboardingInput(company="x", certifications=["ISO27001", "ISO9001"], target=["CRA"]) + hyp = resolve_for_certifications(inp.certifications, _LIB) + without = advisor_start(inp, hyp, _REQ, target_id="CRA", corpus_status={"CRA": "validated"}) + detected = silent_intake(_SIGNALS, _MAP).capability_ids() + with_pass = advisor_start(inp, hyp, _REQ, target_id="CRA", corpus_status={"CRA": "validated"}, + detected_capabilities=detected) + # the whole point: recognising things automatically leaves FEWER open questions + assert len(with_pass.capability_delta) < len(without.capability_delta) + assert len(with_pass.next_best_questions) <= len(without.next_best_questions) + assert with_pass.auto_detected # recognised without asking + assert "automatisch erkannt (Intake)" in with_pass.headline + + +def test_detected_capabilities_are_not_asked_again(): + inp = OnboardingInput(company="x", certifications=["ISO27001"], target=["CRA"]) + hyp = resolve_for_certifications(inp.certifications, _LIB) + detected = silent_intake(_SIGNALS, _MAP).capability_ids() + res = advisor_start(inp, hyp, _REQ, target_id="CRA", corpus_status={"CRA": "validated"}, + detected_capabilities=detected) + asked = {q.capability_id for q in res.next_best_questions} + assert "sbom_creation" not in asked and "sbom_creation" not in res.capability_delta