feat: Silent Knowledge Pass — recognise before asking (Phase 0, before the endpoint)
Not the endpoint yet — the bigger knowledge lever first. The Advisor can say "I need 5 answers" but does not yet decide what it can find out by ITSELF. The Silent Knowledge Pass runs in front of the Advisor and, from signals existing scanners/parsers already produce (website, repository, documents, product data), deterministically derives capabilities the company demonstrably HAS + product facts that drive scope — so every recognised item shrinks the delta and removes a question. compliance/onboarding/silent_intake.py: silent_intake(signals, signal_map) -> detected_capabilities (+ evidence already in hand) + product_facts. The signal->conclusion map is curated DATA (knowledge/onboarding/intake_signal_map.yaml), signals are injected (scanners are upstream). Pure, deterministic, no LLM. advisor_start gains detected_capabilities (folded into the profile at HIGH confidence -> covered, not asked) and an auto_detected result + headline. The experience flips from a question wall to "we already recognised 4 capabilities, 2 product facts and have 4 pieces of evidence in hand — only these few remain". Order now: Silent Pass -> #58 endpoint/frontend -> #59 empirical loop. NOT new architecture, just an orchestration step in front. Non-runtime (no app caller) -> no deploy. 15 onboarding tests pass, mypy --strict clean, check-loc 0.
This commit is contained in:
@@ -21,6 +21,14 @@ from .observations import (
|
||||
empirical_distribution,
|
||||
reviewed,
|
||||
)
|
||||
from .silent_intake import (
|
||||
DetectedCapability,
|
||||
IntakeSignal,
|
||||
ProductFact,
|
||||
SignalMapping,
|
||||
SilentIntakeResult,
|
||||
silent_intake,
|
||||
)
|
||||
from .schemas import (
|
||||
AdvisorMeasure,
|
||||
AdvisorQuestion,
|
||||
@@ -47,4 +55,10 @@ __all__ = [
|
||||
"empirical_distribution",
|
||||
"empirical_confidence",
|
||||
"reviewed",
|
||||
"silent_intake",
|
||||
"IntakeSignal",
|
||||
"SignalMapping",
|
||||
"DetectedCapability",
|
||||
"ProductFact",
|
||||
"SilentIntakeResult",
|
||||
]
|
||||
|
||||
@@ -49,15 +49,21 @@ _GAIN = {"high": 3, "medium": 2, "low": 1}
|
||||
_RISK = {"high": 2, "medium": 1, "low": 0}
|
||||
|
||||
|
||||
def _profile(inp: OnboardingInput, cert_hypotheses: Dict[str, List[str]]) -> CompanyCapabilityProfile:
|
||||
def _profile(
|
||||
inp: OnboardingInput, cert_hypotheses: Dict[str, List[str]],
|
||||
detected: Optional[Sequence[str]] = None,
|
||||
) -> CompanyCapabilityProfile:
|
||||
cmap = {
|
||||
cert: CapabilityMappingEntry(capability_ids=list(caps), confidence=Confidence.MEDIUM)
|
||||
for cert, caps in cert_hypotheses.items()
|
||||
if cert in inp.certifications and caps
|
||||
}
|
||||
ctx = CompanyContext(company_id=inp.company or "company",
|
||||
certifications=[Certification(certification_id=c) for c in cmap])
|
||||
return build_company_profile(ctx, cmap)
|
||||
certs = [Certification(certification_id=c) for c in cmap]
|
||||
if detected: # Silent Pass: concrete findings -> HIGH confidence
|
||||
cmap["__detected__"] = CapabilityMappingEntry(
|
||||
capability_ids=list(dict.fromkeys(detected)), confidence=Confidence.HIGH)
|
||||
certs.append(Certification(certification_id="__detected__"))
|
||||
return build_company_profile(CompanyContext(company_id=inp.company or "company", certifications=certs), cmap)
|
||||
|
||||
|
||||
def advisor_start(
|
||||
@@ -68,15 +74,18 @@ def advisor_start(
|
||||
covers_targets: Optional[Dict[str, List[str]]] = None,
|
||||
corpus_status: Optional[Dict[str, str]] = None,
|
||||
uncertain: Optional[List[Dict[str, str]]] = None,
|
||||
detected_capabilities: Optional[Sequence[str]] = None,
|
||||
) -> AdvisorResult:
|
||||
"""Run the onboarding flow: certs -> profile -> delta -> ranked next-best questions + measures.
|
||||
"""Run the onboarding flow: (silent intake +) certs -> profile -> delta -> ranked questions + measures.
|
||||
|
||||
Pure orchestration; deterministic. `cert_hypotheses` (cert -> probable cap ids) and
|
||||
`target_requirements` are INJECTED. `covers_targets` (cap -> targets it closes) drives leverage.
|
||||
Pure orchestration; deterministic. `cert_hypotheses` (cert -> probable cap ids), `target_requirements`
|
||||
and `detected_capabilities` (from the Silent Knowledge Pass) are INJECTED. Detected capabilities are
|
||||
recognised WITHOUT asking -> they shrink the delta and remove questions.
|
||||
"""
|
||||
covers_targets = covers_targets or {}
|
||||
required = {r.capability_id for r in target_requirements}
|
||||
profile = _profile(inp, cert_hypotheses)
|
||||
profile = _profile(inp, cert_hypotheses, detected_capabilities)
|
||||
auto_detected = sorted(set(detected_capabilities or []) & required)
|
||||
assess = assess_transition(
|
||||
TransitionContext(company_id=inp.company or "company", target=TransitionGoal(target_id=target_id)),
|
||||
list(target_requirements), profile)
|
||||
@@ -123,13 +132,14 @@ def advisor_start(
|
||||
rep = assess_completeness(applicable, corpus_status or {}, uncertain=uncertain or [])
|
||||
unsupported = [e.subject for e in rep.exclusions]
|
||||
|
||||
probably = assess.summary.probably_covered
|
||||
probably = [c for c in assess.summary.probably_covered if c not in set(auto_detected)]
|
||||
return AdvisorResult(
|
||||
inferred_assumptions=inferred, rejected_assumptions=rejected, next_best_questions=next_q,
|
||||
capability_delta=delta, top_measures=measures, evidence_requests=evidence,
|
||||
unsupported_domains=unsupported, completeness_summary=rep.completeness_summary,
|
||||
headline="%d Anforderungen erkannt · %d wahrscheinlich abgedeckt · %d zu klären"
|
||||
% (len(assess.coverage), len(probably), len(next_q)))
|
||||
inferred_assumptions=inferred, rejected_assumptions=rejected, auto_detected=auto_detected,
|
||||
next_best_questions=next_q, capability_delta=delta, top_measures=measures,
|
||||
evidence_requests=evidence, unsupported_domains=unsupported,
|
||||
completeness_summary=rep.completeness_summary,
|
||||
headline="%d Anforderungen erkannt · %d automatisch erkannt (Intake) · %d wahrscheinlich (Zertifikate) · %d zu klären"
|
||||
% (len(assess.coverage), len(auto_detected), len(probably), len(next_q)))
|
||||
|
||||
|
||||
def apply_answer(known_capabilities: Sequence[str], capability_id: str, answer: str) -> List[str]:
|
||||
|
||||
@@ -53,6 +53,7 @@ class AdvisorMeasure(BaseModel):
|
||||
class AdvisorResult(BaseModel):
|
||||
inferred_assumptions: List[InferredAssumption] = Field(default_factory=list)
|
||||
rejected_assumptions: List[RejectedAssumption] = Field(default_factory=list)
|
||||
auto_detected: List[str] = Field(default_factory=list) # Silent Pass: recognised w/o asking
|
||||
next_best_questions: List[AdvisorQuestion] = Field(default_factory=list) # max 5
|
||||
capability_delta: List[str] = Field(default_factory=list)
|
||||
top_measures: List[AdvisorMeasure] = Field(default_factory=list)
|
||||
|
||||
@@ -0,0 +1,99 @@
|
||||
"""Silent Knowledge Pass — recognise everything possible BEFORE asking a single question (Phase 0).
|
||||
|
||||
The Advisor can say "I need 5 answers" but does not yet decide WHAT it can find out by itself. The Silent
|
||||
Pass runs first: from signals that existing scanners/parsers already produce (website, repository,
|
||||
documents, product data) it deterministically derives capabilities the company demonstrably HAS and
|
||||
product facts that drive scope — so every recognised item shrinks the delta and removes a question.
|
||||
|
||||
The customer then experiences "we already recognised 11 of 17 — only these 4 remain" instead of a
|
||||
question wall. This is NOT new architecture: it is one orchestration step in front of the Advisor
|
||||
Company -> Silent Intake -> Company Profile -> Hypotheses -> Delta -> Top Questions
|
||||
All building blocks already exist. SIGNALS are INJECTED (the scanners produce them); the signal->capability
|
||||
map is curated DATA, also injected. Pure, deterministic, no I/O. Python 3.9 compatible.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Dict, List, Optional, Sequence, Set
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class IntakeSignal(BaseModel):
|
||||
"""One finding a scanner/parser produced (no LLM here — the scanners are upstream)."""
|
||||
|
||||
source: str # website / repository / document / product
|
||||
signal: str # signal id, e.g. "sbom_file_found"
|
||||
detail: str = "" # optional (url, filename) for the audit trail
|
||||
|
||||
|
||||
class SignalMapping(BaseModel):
|
||||
"""Curated: what a signal lets us conclude. A signal yields a capability OR a product fact."""
|
||||
|
||||
signal: str
|
||||
capability: Optional[str] = None # capability the signal evidences
|
||||
relationship: str = "detected" # detected (concrete artifact) / partial (indicative)
|
||||
evidence: Optional[str] = None # the artifact found (already in hand -> no upload needed)
|
||||
product_fact: Optional[str] = None # e.g. "connected_to_internet"
|
||||
fact_value: str = "true"
|
||||
|
||||
|
||||
class DetectedCapability(BaseModel):
|
||||
capability: str
|
||||
relationship: str = "detected"
|
||||
source: str = "" # which signal/source detected it (audit trail)
|
||||
evidence: Optional[str] = None
|
||||
|
||||
|
||||
class ProductFact(BaseModel):
|
||||
key: str
|
||||
value: str = "true"
|
||||
source: str = ""
|
||||
|
||||
|
||||
class SilentIntakeResult(BaseModel):
|
||||
detected_capabilities: List[DetectedCapability] = Field(default_factory=list)
|
||||
product_facts: List[ProductFact] = Field(default_factory=list)
|
||||
evidence_found: List[str] = Field(default_factory=list)
|
||||
summary: str = ""
|
||||
|
||||
def capability_ids(self) -> List[str]:
|
||||
"""The detected capability ids — fed into the Advisor as already-present (delta-reducing)."""
|
||||
return sorted({d.capability for d in self.detected_capabilities})
|
||||
|
||||
|
||||
def silent_intake(
|
||||
signals: Sequence[IntakeSignal], signal_map: Sequence[SignalMapping]
|
||||
) -> SilentIntakeResult:
|
||||
"""Derive capabilities + product facts from injected scanner signals (deterministic, no questions).
|
||||
|
||||
Each signal is matched to curated mappings by `signal` id; a mapping contributes either a detected
|
||||
capability (+ optional evidence already in hand) or a product fact. Deduped, deterministic order.
|
||||
"""
|
||||
by_signal: Dict[str, List[SignalMapping]] = {}
|
||||
for m in signal_map:
|
||||
by_signal.setdefault(m.signal, []).append(m)
|
||||
|
||||
caps: Dict[str, DetectedCapability] = {}
|
||||
facts: Dict[str, ProductFact] = {}
|
||||
evidence: Set[str] = set()
|
||||
for s in signals:
|
||||
for m in by_signal.get(s.signal, []):
|
||||
if m.capability and m.capability not in caps:
|
||||
caps[m.capability] = DetectedCapability(
|
||||
capability=m.capability, relationship=m.relationship,
|
||||
source="%s:%s" % (s.source, s.signal), evidence=m.evidence)
|
||||
if m.evidence:
|
||||
evidence.add(m.evidence)
|
||||
if m.product_fact:
|
||||
facts[m.product_fact] = ProductFact(key=m.product_fact, value=m.fact_value, source=s.source)
|
||||
|
||||
detected = [caps[k] for k in sorted(caps)]
|
||||
product_facts = [facts[k] for k in sorted(facts)]
|
||||
summary = (
|
||||
"Stille Vorbefüllung: %d Fähigkeit(en) automatisch erkannt, %d Produktfakt(en), %d Nachweis(e) bereits vorhanden."
|
||||
% (len(detected), len(product_facts), len(evidence))
|
||||
)
|
||||
return SilentIntakeResult(
|
||||
detected_capabilities=detected, product_facts=product_facts,
|
||||
evidence_found=sorted(evidence), summary=summary)
|
||||
@@ -0,0 +1,31 @@
|
||||
# Silent Knowledge Pass — signal -> conclusion map (curated DATA, injected).
|
||||
#
|
||||
# What a scanner finding lets us conclude WITHOUT asking the user. A signal yields either a capability
|
||||
# the company demonstrably has (with the evidence already in hand) or a product fact that drives scope.
|
||||
# `relationship: detected` = a concrete artifact (strong, no question); `partial` = indicative (still
|
||||
# verify, but lower priority). The scanners (website crawler, repo scanner, doc parser, product intake)
|
||||
# are UPSTREAM and produce the signals; this file only interprets them. No norm text, no real names.
|
||||
|
||||
mappings:
|
||||
# ── website ───────────────────────────────────────────────────────────────────────────────
|
||||
- {signal: security_txt_or_cvd_policy, capability: coordinated_vulnerability_disclosure, relationship: detected, evidence: cvd_policy}
|
||||
- {signal: ce_marking_on_site, capability: ce_conformity_assessment_and_technical_documentation, relationship: partial, evidence: ce_declaration}
|
||||
- {signal: support_lifecycle_page, capability: security_update_support_period, relationship: partial, evidence: support_policy}
|
||||
- {signal: security_policy_page, capability: information_security_management, relationship: partial}
|
||||
# ── repository ────────────────────────────────────────────────────────────────────────────
|
||||
- {signal: sbom_file_found, capability: sbom_creation, relationship: detected, evidence: sbom}
|
||||
- {signal: signed_releases, capability: secure_signed_update_distribution, relationship: detected, evidence: signing_config}
|
||||
- {signal: github_actions_ci, capability: secure_development_lifecycle, relationship: partial, evidence: ci_pipeline}
|
||||
- {signal: dependency_scanning, capability: technical_vulnerability_management, relationship: partial, evidence: vuln_scanning_config}
|
||||
# ── documents ─────────────────────────────────────────────────────────────────────────────
|
||||
- {signal: ce_conformity_doc, capability: ce_conformity_assessment_and_technical_documentation, relationship: detected, evidence: technical_documentation}
|
||||
- {signal: product_risk_assessment_doc, capability: product_cyber_risk_assessment, relationship: detected, evidence: product_risk_assessment}
|
||||
- {signal: patch_policy_doc, capability: secure_signed_update_distribution, relationship: partial, evidence: patch_policy}
|
||||
- {signal: incident_response_plan_doc, capability: incident_management, relationship: detected, evidence: incident_procedure}
|
||||
# ── product facts (drive scope / target applicability) ──────────────────────────────────────
|
||||
- {signal: cloud_connectivity, product_fact: connected_to_internet}
|
||||
- {signal: plc_sps, product_fact: is_machine}
|
||||
- {signal: embedded_software, product_fact: has_embedded_software}
|
||||
- {signal: wireless_radio, product_fact: has_radio_equipment}
|
||||
- {signal: remote_access, product_fact: has_remote_access}
|
||||
- {signal: generates_usage_data, product_fact: generates_usage_data}
|
||||
@@ -5,8 +5,14 @@ _Eingabe: Unternehmen + Produkte + Zertifizierungen + Ziel. Den Rest macht die O
|
||||
## Eingabe
|
||||
> Zertifizierungen: **ISO9001, ISO27001, ISO14001, TISAX** · Produkt: **Parkschein-/Schrankensystem** · Ziel: **CRA**
|
||||
|
||||
## Phase 0 — Stille Vorbefüllung (BEVOR eine Frage erscheint)
|
||||
> Stille Vorbefüllung: 4 Fähigkeit(en) automatisch erkannt, 2 Produktfakt(en), 4 Nachweis(e) bereits vorhanden.
|
||||
- **Automatisch erkannte Fähigkeiten:** `coordinated_vulnerability_disclosure`, `product_cyber_risk_assessment`, `sbom_creation`, `secure_signed_update_distribution`
|
||||
- **Produktfakten (steuern den Scope):** `connected_to_internet=true`, `is_machine=true`
|
||||
- **Nachweise bereits in der Hand (kein Upload nötig):** cvd_policy, product_risk_assessment, sbom, signing_config
|
||||
|
||||
## Was wir erkannt haben
|
||||
> 17 Anforderungen erkannt · 5 wahrscheinlich abgedeckt · 5 zu klären
|
||||
> 17 Anforderungen erkannt · 4 automatisch erkannt (Intake) · 5 wahrscheinlich (Zertifikate) · 5 zu klären
|
||||
|
||||
**Aus Ihren Zertifizierungen abgeleitet (zu bestätigen, nicht automatisch erfüllt):**
|
||||
- ISO9001 legt 1 relevante Fähigkeit(en) nahe — Verifikation erforderlich, nicht automatisch erfüllt
|
||||
@@ -16,26 +22,26 @@ _Eingabe: Unternehmen + Produkte + Zertifizierungen + Ziel. Den Rest macht die O
|
||||
|
||||
## Die wenigen offenen Punkte — nur die nächsten besten Fragen
|
||||
**Frage 1 von 5** _(Informationswert 8)_
|
||||
> product cyber risk assessment? — _Warum fragen wir das: Keine Anhaltspunkte im Unternehmensprofil — klären._
|
||||
|
||||
**Frage 2 von 5** _(Informationswert 8)_
|
||||
> protection against corruption of safety functions? — _Warum fragen wir das: Keine Anhaltspunkte im Unternehmensprofil — klären._
|
||||
|
||||
**Frage 3 von 5** _(Informationswert 8)_
|
||||
> secure signed update distribution? — _Warum fragen wir das: Keine Anhaltspunkte im Unternehmensprofil — klären._
|
||||
|
||||
**Frage 4 von 5** _(Informationswert 7)_
|
||||
> coordinated vulnerability disclosure? — _Warum fragen wir das: Keine Anhaltspunkte im Unternehmensprofil — klären._
|
||||
|
||||
**Frage 5 von 5** _(Informationswert 7)_
|
||||
**Frage 2 von 5** _(Informationswert 7)_
|
||||
> exploited vuln and incident reporting? — _Warum fragen wir das: Keine Anhaltspunkte im Unternehmensprofil — klären._
|
||||
|
||||
**Frage 3 von 5** _(Informationswert 7)_
|
||||
> machine safety risk assessment? — _Warum fragen wir das: Keine Anhaltspunkte im Unternehmensprofil — klären._
|
||||
|
||||
**Frage 4 von 5** _(Informationswert 7)_
|
||||
> mechanical safety and guards? — _Warum fragen wir das: Keine Anhaltspunkte im Unternehmensprofil — klären._
|
||||
|
||||
**Frage 5 von 5** _(Informationswert 7)_
|
||||
> operating instructions and safety information? — _Warum fragen wir das: Keine Anhaltspunkte im Unternehmensprofil — klären._
|
||||
|
||||
## Womit zuerst anfangen (größter Hebel)
|
||||
- `product_cyber_risk_assessment` — schließt 2 Anforderung(en): CRA, MaschinenVO
|
||||
- `protection_against_corruption_of_safety_functions` — schließt 2 Anforderung(en): CRA, MaschinenVO
|
||||
- `secure_signed_update_distribution` — schließt 2 Anforderung(en): CRA, MaschinenVO
|
||||
- `coordinated_vulnerability_disclosure` — schließt 1 Anforderung(en): CRA
|
||||
- `exploited_vuln_and_incident_reporting` — schließt 1 Anforderung(en): CRA
|
||||
- `machine_safety_risk_assessment` — schließt 1 Anforderung(en): MaschinenVO
|
||||
- `mechanical_safety_and_guards` — schließt 1 Anforderung(en): MaschinenVO
|
||||
- `operating_instructions_and_safety_information` — schließt 1 Anforderung(en): MaschinenVO
|
||||
|
||||
## Vollständigkeit (ehrlich)
|
||||
> Identifiziert 1 · bewertet 1 · offen 0 · Unsicherheiten 0 · Begründung ja
|
||||
|
||||
@@ -12,7 +12,10 @@ from __future__ import annotations
|
||||
import os
|
||||
import yaml
|
||||
|
||||
from compliance.onboarding import CapabilityHypothesis, OnboardingInput, advisor_start, resolve_for_certifications
|
||||
from compliance.onboarding import (
|
||||
CapabilityHypothesis, IntakeSignal, OnboardingInput, SignalMapping,
|
||||
advisor_start, resolve_for_certifications, silent_intake,
|
||||
)
|
||||
from compliance.transition_reasoning import TargetRequirement
|
||||
|
||||
OUT = []
|
||||
@@ -37,7 +40,18 @@ inp = OnboardingInput(company="synthetisch", industry="machine_builder",
|
||||
certifications=["ISO9001", "ISO27001", "ISO14001", "TISAX"],
|
||||
known_evidence=["CE process"], target=["CRA"])
|
||||
hyp = resolve_for_certifications(inp.certifications, _lib)
|
||||
res = advisor_start(inp, hyp, req, target_id="CRA", covers_targets=covers, corpus_status={"CRA": "validated"})
|
||||
# Phase 0 — Silent Knowledge Pass: recognise everything possible from scanner signals BEFORE asking.
|
||||
_smap = [SignalMapping(**m) for m in yaml.safe_load(
|
||||
open(os.path.join(os.path.dirname(__file__), "..", "knowledge", "onboarding", "intake_signal_map.yaml"), encoding="utf-8"))["mappings"]]
|
||||
_signals = [IntakeSignal(source="website", signal="security_txt_or_cvd_policy", detail="/.well-known/security.txt"),
|
||||
IntakeSignal(source="repository", signal="sbom_file_found", detail="sbom.cdx.json"),
|
||||
IntakeSignal(source="repository", signal="signed_releases"),
|
||||
IntakeSignal(source="document", signal="product_risk_assessment_doc"),
|
||||
IntakeSignal(source="product", signal="cloud_connectivity"),
|
||||
IntakeSignal(source="product", signal="plc_sps")]
|
||||
si = silent_intake(_signals, _smap)
|
||||
res = advisor_start(inp, hyp, req, target_id="CRA", covers_targets=covers, corpus_status={"CRA": "validated"},
|
||||
detected_capabilities=si.capability_ids())
|
||||
|
||||
w("# Smart Onboarding Advisor — was der Nutzer sieht (automatisch, ohne Vertrieb)")
|
||||
w("")
|
||||
@@ -46,6 +60,12 @@ w("")
|
||||
w("## Eingabe")
|
||||
w("> Zertifizierungen: **%s** · Produkt: **%s** · Ziel: **%s**" % (", ".join(inp.certifications), inp.products[0], ", ".join(inp.target)))
|
||||
w("")
|
||||
w("## Phase 0 — Stille Vorbefüllung (BEVOR eine Frage erscheint)")
|
||||
w("> %s" % si.summary)
|
||||
w("- **Automatisch erkannte Fähigkeiten:** %s" % ", ".join("`%s`" % d.capability for d in si.detected_capabilities))
|
||||
w("- **Produktfakten (steuern den Scope):** %s" % ", ".join("`%s=%s`" % (f.key, f.value) for f in si.product_facts))
|
||||
w("- **Nachweise bereits in der Hand (kein Upload nötig):** %s" % ", ".join(si.evidence_found))
|
||||
w("")
|
||||
w("## Was wir erkannt haben")
|
||||
w("> %s" % res.headline)
|
||||
w("")
|
||||
|
||||
@@ -0,0 +1,79 @@
|
||||
"""Silent Knowledge Pass — recognise before asking (Phase 0).
|
||||
|
||||
Pins the deterministic signal->capability/product-fact mapping and the product effect that matters: when
|
||||
the Silent Pass feeds detected capabilities into the Advisor, the delta shrinks and the number of
|
||||
next-best questions DROPS — "we already recognised X, only these few remain" instead of a question wall.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
|
||||
import yaml
|
||||
|
||||
from compliance.onboarding import (
|
||||
IntakeSignal,
|
||||
OnboardingInput,
|
||||
SignalMapping,
|
||||
advisor_start,
|
||||
resolve_for_certifications,
|
||||
silent_intake,
|
||||
)
|
||||
from compliance.onboarding import CapabilityHypothesis
|
||||
from compliance.transition_reasoning import TargetRequirement
|
||||
|
||||
_DIR = os.path.dirname(__file__)
|
||||
_MAP = [SignalMapping(**m) for m in yaml.safe_load(
|
||||
open(os.path.join(_DIR, "..", "knowledge", "onboarding", "intake_signal_map.yaml"), encoding="utf-8"))["mappings"]]
|
||||
_LIB = [CapabilityHypothesis(**h) for h in yaml.safe_load(
|
||||
open(os.path.join(_DIR, "..", "knowledge", "certification_hypotheses", "hypotheses.yaml"), encoding="utf-8"))["hypotheses"]]
|
||||
_CRA = yaml.safe_load(open(os.path.join(_DIR, "..", "knowledge", "transition_patterns",
|
||||
"transition_pattern_iso27001_to_cra_maschinenvo_v1.yaml"), encoding="utf-8"))
|
||||
_REQ = [TargetRequirement(capability_id=a["capability"]) for a in _CRA["likely_covered"]]
|
||||
_REQ += [TargetRequirement(capability_id=d["capability"], expected_evidence=d.get("expected_evidence", []))
|
||||
for d in _CRA["delta_requirements"]]
|
||||
|
||||
# scanner findings (injected): a machine builder with a public CVD policy, an SBOM + signed releases in
|
||||
# the repo, a product risk-assessment doc, and a cloud-connected PLC product.
|
||||
_SIGNALS = [
|
||||
IntakeSignal(source="website", signal="security_txt_or_cvd_policy", detail="/.well-known/security.txt"),
|
||||
IntakeSignal(source="repository", signal="sbom_file_found", detail="sbom.cdx.json"),
|
||||
IntakeSignal(source="repository", signal="signed_releases"),
|
||||
IntakeSignal(source="document", signal="product_risk_assessment_doc"),
|
||||
IntakeSignal(source="product", signal="cloud_connectivity"),
|
||||
IntakeSignal(source="product", signal="plc_sps"),
|
||||
]
|
||||
|
||||
|
||||
def test_silent_intake_is_deterministic_signal_mapping():
|
||||
res = silent_intake(_SIGNALS, _MAP)
|
||||
caps = set(res.capability_ids())
|
||||
assert {"coordinated_vulnerability_disclosure", "sbom_creation", "secure_signed_update_distribution",
|
||||
"product_cyber_risk_assessment"} <= caps
|
||||
assert "sbom" in res.evidence_found # evidence already in hand -> no upload needed
|
||||
facts = {f.key for f in res.product_facts}
|
||||
assert "connected_to_internet" in facts and "is_machine" in facts
|
||||
|
||||
|
||||
def test_silent_pass_reduces_the_questions():
|
||||
inp = OnboardingInput(company="x", certifications=["ISO27001", "ISO9001"], target=["CRA"])
|
||||
hyp = resolve_for_certifications(inp.certifications, _LIB)
|
||||
without = advisor_start(inp, hyp, _REQ, target_id="CRA", corpus_status={"CRA": "validated"})
|
||||
detected = silent_intake(_SIGNALS, _MAP).capability_ids()
|
||||
with_pass = advisor_start(inp, hyp, _REQ, target_id="CRA", corpus_status={"CRA": "validated"},
|
||||
detected_capabilities=detected)
|
||||
# the whole point: recognising things automatically leaves FEWER open questions
|
||||
assert len(with_pass.capability_delta) < len(without.capability_delta)
|
||||
assert len(with_pass.next_best_questions) <= len(without.next_best_questions)
|
||||
assert with_pass.auto_detected # recognised without asking
|
||||
assert "automatisch erkannt (Intake)" in with_pass.headline
|
||||
|
||||
|
||||
def test_detected_capabilities_are_not_asked_again():
|
||||
inp = OnboardingInput(company="x", certifications=["ISO27001"], target=["CRA"])
|
||||
hyp = resolve_for_certifications(inp.certifications, _LIB)
|
||||
detected = silent_intake(_SIGNALS, _MAP).capability_ids()
|
||||
res = advisor_start(inp, hyp, _REQ, target_id="CRA", corpus_status={"CRA": "validated"},
|
||||
detected_capabilities=detected)
|
||||
asked = {q.capability_id for q in res.next_best_questions}
|
||||
assert "sbom_creation" not in asked and "sbom_creation" not in res.capability_delta
|
||||
Reference in New Issue
Block a user