diff --git a/backend-compliance/compliance/capability/__init__.py b/backend-compliance/compliance/capability/__init__.py new file mode 100644 index 00000000..12bbf9e1 --- /dev/null +++ b/backend-compliance/compliance/capability/__init__.py @@ -0,0 +1,70 @@ +"""Master Capability Registry v0 (Phase 2C) — Compliance Execution domain. + +Registry + minting layer for Master Capabilities — the third instance of the +identity-machine pattern (Master Controls, Master Obligations, Master Capabilities). + +STORED: identities, sources, relationship types, policy versions, lifecycle events, +provenance. DERIVED (never stored): confidence, coverage, gap. + +v0 scope: types + minting + typed relations + versioned policy + identity lifecycle. +NOT here: Company-Gap, real ISO/cert mappings, certification derivations, UI, RAG, +new meta-model class, generic canonicalization engine. +""" + +from __future__ import annotations + +from .engine import ( + CapabilityRegistry, + deprecate_capability, + evaluate_relation, + merge_capabilities, + mint_capability, + resolve, + split_capability, +) +from .policy import DEFAULT_POLICY, assert_no_certification_confirms +from .schemas import ( + AssertionStatus, + CapabilityCandidate, + CapabilityRelation, + Confidence, + DerivedAssessment, + EvidenceKind, + IdentityLifecycleEvent, + LifecycleEventType, + LifecycleState, + MasterCapability, + PolicyRule, + PolicyVersion, + Provenance, + RelationType, +) + +__all__ = [ + # engine + "CapabilityRegistry", + "mint_capability", + "evaluate_relation", + "resolve", + "deprecate_capability", + "merge_capabilities", + "split_capability", + # policy + "DEFAULT_POLICY", + "assert_no_certification_confirms", + # schemas + "MasterCapability", + "CapabilityCandidate", + "CapabilityRelation", + "RelationType", + "EvidenceKind", + "AssertionStatus", + "Confidence", + "PolicyRule", + "PolicyVersion", + "IdentityLifecycleEvent", + "LifecycleEventType", + "LifecycleState", + "Provenance", + "DerivedAssessment", +] diff --git a/backend-compliance/compliance/capability/engine.py b/backend-compliance/compliance/capability/engine.py new file mode 100644 index 00000000..9b7c41db --- /dev/null +++ b/backend-compliance/compliance/capability/engine.py @@ -0,0 +1,191 @@ +"""Master Capability Registry v0 — minting, derivation, identity lifecycle. + +STORED on the registry: identities, sources, relation types, policy versions, +lifecycle events, provenance. DERIVED (never stored): confidence/status, via +`evaluate_relation` under a versioned policy. + +Python 3.9 compatible (no `|` unions). +""" + +from __future__ import annotations + +from typing import Dict, List, Optional, Set + +from pydantic import BaseModel, Field + +from .policy import DEFAULT_POLICY +from .schemas import ( + AssertionStatus, + CapabilityCandidate, + CapabilityRelation, + Confidence, + DerivedAssessment, + IdentityLifecycleEvent, + LifecycleEventType, + LifecycleState, + MasterCapability, + PolicyVersion, + Provenance, +) + + +class CapabilityRegistry(BaseModel): + # NOTE: no confidence/coverage field anywhere — those are DERIVED, never stored. + capabilities: Dict[str, MasterCapability] = Field(default_factory=dict) + relations: List[CapabilityRelation] = Field(default_factory=list) + lifecycle_events: List[IdentityLifecycleEvent] = Field(default_factory=list) + policy: PolicyVersion = Field(default_factory=lambda: DEFAULT_POLICY) + next_serial: int = 1 + + +def _mcap_id(serial: int) -> str: + return "MCAP-%05d" % serial + + +def _next_event_id(registry: "CapabilityRegistry") -> str: + return "evt-%d" % (len(registry.lifecycle_events) + 1) + + +def mint_capability( + registry: CapabilityRegistry, + candidate: CapabilityCandidate, + provenance: Optional[Provenance] = None, + name: str = "", + definition: str = "", + category: str = "", + domains: Optional[List[str]] = None, +) -> MasterCapability: + """Assign the next stable MCAP id to a candidate and register it (with provenance).""" + cap_id = _mcap_id(registry.next_serial) + cap = MasterCapability( + capability_id=cap_id, + name=name or candidate.normalized or candidate.raw_term, + definition=definition, + category=category, + domains=domains or [], + provenance=provenance + or Provenance(author="system", basis="minted from candidate '%s'" % candidate.raw_term), + ) + registry.capabilities[cap_id] = cap + registry.next_serial += 1 + return cap + + +def evaluate_relation( + relation: CapabilityRelation, policy: Optional[PolicyVersion] = None +) -> DerivedAssessment: + """Derive (status, confidence) from (relationship_type, evidence_kind) under a + versioned policy. Deterministic; result is returned, never stored.""" + pol = policy if policy is not None else DEFAULT_POLICY + status = AssertionStatus.UNKNOWN + confidence = Confidence.LOW + found = False + for rule in pol.rules: + if ( + rule.relationship_type == relation.relationship_type + and rule.evidence_kind == relation.evidence_kind + ): + status, confidence, found = rule.status, rule.confidence, True + break + expl = "%s + %s under %s -> %s/%s%s" % ( + relation.relationship_type.value, + relation.evidence_kind.value, + pol.policy_version, + status.value, + confidence.value, + "" if found else " (no rule)", + ) + return DerivedAssessment( + target_capability_id=relation.target_capability_id, + status=status, + confidence=confidence, + policy_version=pol.policy_version, + explanation=expl, + ) + + +def resolve( + registry: CapabilityRegistry, capability_id: str, _seen: Optional[Set[str]] = None +) -> Optional[MasterCapability]: + """Follow redirects (from merge/deprecate) to the current canonical capability.""" + seen = _seen if _seen is not None else set() + if capability_id in seen: + return None # redirect cycle guard + seen.add(capability_id) + cap = registry.capabilities.get(capability_id) + if cap is None: + return None + if cap.redirect_to: + return resolve(registry, cap.redirect_to, seen) + # terminal: only an ACTIVE capability resolves; a deprecated dead-end -> None + return cap if cap.state == LifecycleState.ACTIVE else None + + +def deprecate_capability( + registry: CapabilityRegistry, + capability_id: str, + redirect_to: Optional[str] = None, + provenance: Optional[Provenance] = None, +) -> IdentityLifecycleEvent: + cap = registry.capabilities.get(capability_id) + if cap is None: + raise KeyError(capability_id) + cap.state = LifecycleState.DEPRECATED + cap.redirect_to = redirect_to + event = IdentityLifecycleEvent( + event_id=_next_event_id(registry), + event_type=LifecycleEventType.REDIRECT if redirect_to else LifecycleEventType.DEPRECATE, + from_ids=[capability_id], + to_ids=[redirect_to] if redirect_to else [], + provenance=provenance or Provenance(author="system", basis="deprecate %s" % capability_id), + ) + registry.lifecycle_events.append(event) + return event + + +def merge_capabilities( + registry: CapabilityRegistry, + from_id: str, + into_id: str, + provenance: Optional[Provenance] = None, +) -> IdentityLifecycleEvent: + """Merge `from_id` into `into_id`: deprecate `from_id` with a redirect to `into_id`.""" + if from_id not in registry.capabilities or into_id not in registry.capabilities: + raise KeyError("%s or %s" % (from_id, into_id)) + frm = registry.capabilities[from_id] + frm.state = LifecycleState.DEPRECATED + frm.redirect_to = into_id + event = IdentityLifecycleEvent( + event_id=_next_event_id(registry), + event_type=LifecycleEventType.MERGE, + from_ids=[from_id], + to_ids=[into_id], + provenance=provenance or Provenance(author="system", basis="merge %s -> %s" % (from_id, into_id)), + ) + registry.lifecycle_events.append(event) + return event + + +def split_capability( + registry: CapabilityRegistry, + from_id: str, + into_ids: List[str], + primary: Optional[str] = None, + provenance: Optional[Provenance] = None, +) -> IdentityLifecycleEvent: + """Split `from_id` into several capabilities. The old id deprecates; it redirects + to `primary` only if one is given (else it resolves to None — split is ambiguous).""" + if from_id not in registry.capabilities: + raise KeyError(from_id) + frm = registry.capabilities[from_id] + frm.state = LifecycleState.DEPRECATED + frm.redirect_to = primary + event = IdentityLifecycleEvent( + event_id=_next_event_id(registry), + event_type=LifecycleEventType.SPLIT, + from_ids=[from_id], + to_ids=list(into_ids), + provenance=provenance or Provenance(author="system", basis="split %s" % from_id), + ) + registry.lifecycle_events.append(event) + return event diff --git a/backend-compliance/compliance/capability/policy.py b/backend-compliance/compliance/capability/policy.py new file mode 100644 index 00000000..cf4a4577 --- /dev/null +++ b/backend-compliance/compliance/capability/policy.py @@ -0,0 +1,65 @@ +"""Derivation policy v0 for the Master Capability Registry. + +Confidence + status are DERIVED from (relationship_type, evidence_kind) under a +versioned policy — never stored. HARD RULE baked in and structurally guarded: a +CERTIFICATION is a claim, never proof — no certification-backed rule may yield +CONFIRMED. CONFIRMED requires a CONFIRMS relation backed by a concrete ARTIFACT +(or an EXPERT assertion). + +Python 3.9 compatible (no `|` unions). +""" + +from __future__ import annotations + +from .schemas import ( + AssertionStatus, + Confidence, + EvidenceKind, + PolicyRule, + PolicyVersion, + RelationType, +) + + +def _rule(rt: RelationType, ek: EvidenceKind, st: AssertionStatus, cf: Confidence) -> PolicyRule: + return PolicyRule(relationship_type=rt, evidence_kind=ek, status=st, confidence=cf) + + +# (relationship_type, evidence_kind) -> (status, confidence) +_V0_RULES = [ + # concrete artifact / expert confirming the capability -> CONFIRMED + _rule(RelationType.CONFIRMS, EvidenceKind.ARTIFACT, AssertionStatus.CONFIRMED, Confidence.HIGH), + _rule(RelationType.CONFIRMS, EvidenceKind.EXPERT, AssertionStatus.CONFIRMED, Confidence.MEDIUM), + # equivalent capability — certificate or artifact behind it -> INFERRED (never confirmed) + _rule(RelationType.EQUIVALENT, EvidenceKind.CERTIFICATION, AssertionStatus.INFERRED, Confidence.HIGH), + _rule(RelationType.EQUIVALENT, EvidenceKind.ARTIFACT, AssertionStatus.INFERRED, Confidence.HIGH), + # supports — weaker + _rule(RelationType.SUPPORTS, EvidenceKind.CERTIFICATION, AssertionStatus.INFERRED, Confidence.LOW), + _rule(RelationType.SUPPORTS, EvidenceKind.ARTIFACT, AssertionStatus.INFERRED, Confidence.MEDIUM), + # requires = an obligation NEEDS the capability (relevance, not possession) + _rule(RelationType.REQUIRES, EvidenceKind.NONE, AssertionStatus.UNKNOWN, Confidence.LOW), + # broader/narrower certificate -> weak inference + _rule(RelationType.BROADER_THAN, EvidenceKind.CERTIFICATION, AssertionStatus.INFERRED, Confidence.LOW), + _rule(RelationType.NARROWER_THAN, EvidenceKind.CERTIFICATION, AssertionStatus.INFERRED, Confidence.LOW), + _rule(RelationType.RELATED_TO, EvidenceKind.CERTIFICATION, AssertionStatus.UNKNOWN, Confidence.LOW), +] + +DEFAULT_POLICY = PolicyVersion( + policy_version="capability-policy-v0", + description="v0: certification never yields CONFIRMED; only CONFIRMS + ARTIFACT/EXPERT does.", + rules=_V0_RULES, +) + + +def assert_no_certification_confirms(policy: PolicyVersion) -> None: + """Structural guard for the hard rule: no CERTIFICATION-backed rule is CONFIRMED.""" + for r in policy.rules: + if r.evidence_kind == EvidenceKind.CERTIFICATION and r.status == AssertionStatus.CONFIRMED: + raise ValueError( + "policy %s violates hard rule: certification -> confirmed (%s)" + % (policy.policy_version, r.relationship_type.value) + ) + + +# fail fast at import: the shipped default must satisfy the hard rule +assert_no_certification_confirms(DEFAULT_POLICY) diff --git a/backend-compliance/compliance/capability/schemas.py b/backend-compliance/compliance/capability/schemas.py new file mode 100644 index 00000000..f2924d57 --- /dev/null +++ b/backend-compliance/compliance/capability/schemas.py @@ -0,0 +1,150 @@ +"""Master Capability Registry v0 — Compliance Execution domain (Phase 2C). + +Built from the Reasoning session per user directive, but this IS the Compliance +Execution model (Execution owns Capability). Third real instance of the +identity-machine pattern (after Master Controls and Master Obligations): + + Candidate -> Normalization -> Dedup -> Stable Identity (MCAP) -> Typed Relations + +KEY SENTENCE (stored vs derived): + STORED : identities, sources, relationship types, policy versions, lifecycle + events, provenance. + DERIVED : confidence, coverage and gap statements — computed on demand, NEVER + stored (see policy.py / engine.evaluate_relation). + +These are APPLICATION/registry types, NOT compliance-meta-model classes. In +particular `CapabilityRelation` is relation METADATA inside the registry — it does +NOT introduce a new meta-model class. Whether a reified relation must enter the +frozen meta-model is a Meta-Model-Owner decision (architecture freeze v1.0), +deferred until a demonstrable failure case exists. + +Self-contained (no Reasoning import — Reasoning consumes Capability, not the other +way round). Python 3.9 compatible (no `|` unions). +""" + +from __future__ import annotations + +from enum import Enum +from typing import List, Optional + +from pydantic import BaseModel, Field + + +class Confidence(str, Enum): + HIGH = "high" + MEDIUM = "medium" + LOW = "low" + + +class AssertionStatus(str, Enum): + """How well-established a capability claim is. A numeric score is presentation; + THIS type is the truth (derived from relationship type + evidence + policy).""" + + DECLARED = "declared" + INFERRED = "inferred" + CONFIRMED = "confirmed" + UNKNOWN = "unknown" + + +class RelationType(str, Enum): + EQUIVALENT = "equivalent" + SUPPORTS = "supports" + REQUIRES = "requires" + CONFIRMS = "confirms" + BROADER_THAN = "broader_than" + NARROWER_THAN = "narrower_than" + RELATED_TO = "related_to" + + +class EvidenceKind(str, Enum): + CERTIFICATION = "certification" # a held certificate — a CLAIM, never proof + ARTIFACT = "artifact" # concrete doc/config/test/log + EXPERT = "expert" # human expert assertion + NONE = "none" + + +class LifecycleState(str, Enum): + ACTIVE = "active" + DEPRECATED = "deprecated" + + +class LifecycleEventType(str, Enum): + MERGE = "merge" + SPLIT = "split" + DEPRECATE = "deprecate" + REDIRECT = "redirect" + + +class Provenance(BaseModel): + """Every CURATED atom carries its own provenance (who / when / on what basis).""" + + author: str = "" + asserted_at: Optional[str] = None # ISO timestamp passed in; never generated here + basis: str = "" + + +# ── stored: identity ────────────────────────────────────────────────────── +class MasterCapability(BaseModel): + capability_id: str # stable MCAP-xxxxx + name: str = "" + definition: str = "" + category: str = "" + domains: List[str] = Field(default_factory=list) + typical_evidence: List[str] = Field(default_factory=list) + version: int = 1 + state: LifecycleState = LifecycleState.ACTIVE + redirect_to: Optional[str] = None # set on merge/deprecate + provenance: Provenance = Field(default_factory=Provenance) + + +class CapabilityCandidate(BaseModel): + raw_term: str # e.g. "Patch Management" + source: str = "" # e.g. "CRA:Annex I (2)(d)" + normalized: str = "" + + +# ── stored: typed relation metadata (NOT a meta-model class) ────────────── +class CapabilityRelation(BaseModel): + relation_id: str + source: str # external term/obligation/certification id, e.g. "certification:ISO27001" + target_capability_id: str # MCAP-... + relationship_type: RelationType + evidence_kind: EvidenceKind = EvidenceKind.NONE + provenance: Provenance = Field(default_factory=Provenance) + + +# ── stored: versioned derivation policy ─────────────────────────────────── +class PolicyRule(BaseModel): + relationship_type: RelationType + evidence_kind: EvidenceKind + status: AssertionStatus + confidence: Confidence + + +class PolicyVersion(BaseModel): + """A versioned derivation policy. `policy_version` is recorded with every + assessment so "why did you say X last year" is answerable with the policy + as-of-then. Without this, `derived` and `auditable/reproducible` contradict.""" + + policy_version: str + description: str = "" + rules: List[PolicyRule] = Field(default_factory=list) + + +# ── stored: identity lifecycle ──────────────────────────────────────────── +class IdentityLifecycleEvent(BaseModel): + event_id: str + event_type: LifecycleEventType + from_ids: List[str] = Field(default_factory=list) + to_ids: List[str] = Field(default_factory=list) + at: Optional[str] = None + provenance: Provenance = Field(default_factory=Provenance) + + +# ── DERIVED — never stored ──────────────────────────────────────────────── +class DerivedAssessment(BaseModel): + target_capability_id: str + status: AssertionStatus + confidence: Confidence + policy_version: str + explanation: str = "" diff --git a/backend-compliance/tests/test_capability.py b/backend-compliance/tests/test_capability.py new file mode 100644 index 00000000..9ba8208f --- /dev/null +++ b/backend-compliance/tests/test_capability.py @@ -0,0 +1,153 @@ +"""Tests for Master Capability Registry v0 (Phase 2C, Compliance Execution domain). + +Acceptance: a registry mints stable MCAP ids, stores typed relations + versioned +policy + lifecycle events + provenance, and DERIVES confidence/status on demand +(never stored). Hard rule: a certification alone can never yield CONFIRMED. + +No real ISO/cert mappings here — only synthetic relations (mappings are not part of +v0; they are Execution data, injected later). +""" + +from __future__ import annotations + +import pytest + +from compliance.capability import ( + DEFAULT_POLICY, + AssertionStatus, + CapabilityCandidate, + CapabilityRegistry, + CapabilityRelation, + Confidence, + EvidenceKind, + LifecycleEventType, + PolicyRule, + PolicyVersion, + RelationType, + assert_no_certification_confirms, + deprecate_capability, + evaluate_relation, + merge_capabilities, + mint_capability, + resolve, +) + + +def _rel(rt, ek, target="MCAP-00001", src="x"): + return CapabilityRelation( + relation_id="r", source=src, target_capability_id=target, relationship_type=rt, evidence_kind=ek + ) + + +# 1. minting assigns stable, incrementing MCAP ids. +def test_mint_stable_mcap_id(): + reg = CapabilityRegistry() + a = mint_capability(reg, CapabilityCandidate(raw_term="Patch Management")) + b = mint_capability(reg, CapabilityCandidate(raw_term="Incident Response")) + assert a.capability_id == "MCAP-00001" + assert b.capability_id == "MCAP-00002" + assert reg.capabilities["MCAP-00001"].name == "Patch Management" + + +# 2. minted capability carries provenance. +def test_mint_records_provenance(): + reg = CapabilityRegistry() + cap = mint_capability(reg, CapabilityCandidate(raw_term="Secure Development")) + assert cap.provenance.author == "system" + assert "Secure Development" in cap.provenance.basis + + +# 3. all seven relationship types exist. +def test_relation_types_complete(): + assert {t.value for t in RelationType} == { + "equivalent", "supports", "requires", "confirms", "broader_than", "narrower_than", "related_to", + } + + +# 4. confidence is COMPUTED from relation_type + evidence + policy_version, not stored. +def test_confidence_computed_and_policy_referenced(): + rel = _rel(RelationType.SUPPORTS, EvidenceKind.CERTIFICATION) + a = evaluate_relation(rel, DEFAULT_POLICY) + assert a.status == AssertionStatus.INFERRED and a.confidence == Confidence.LOW + assert a.policy_version == "capability-policy-v0" + # the registry itself stores NO confidence/coverage + assert "confidence" not in CapabilityRegistry.model_fields + assert "coverage" not in CapabilityRegistry.model_fields + + +# 4b. a DIFFERENT policy version yields a different result for the SAME relation +# -> "why did you say X last year" needs the policy-as-of-then. +def test_policy_versioning_changes_outcome(): + rel = _rel(RelationType.SUPPORTS, EvidenceKind.CERTIFICATION) + v0b = PolicyVersion( + policy_version="capability-policy-v0b", + rules=[PolicyRule( + relationship_type=RelationType.SUPPORTS, evidence_kind=EvidenceKind.CERTIFICATION, + status=AssertionStatus.INFERRED, confidence=Confidence.MEDIUM, + )], + ) + assert evaluate_relation(rel, DEFAULT_POLICY).confidence == Confidence.LOW + assert evaluate_relation(rel, v0b).confidence == Confidence.MEDIUM + assert evaluate_relation(rel, v0b).policy_version == "capability-policy-v0b" + + +# 5 + 8. HARD RULE: a certification alone can NEVER be CONFIRMED. +def test_certification_never_confirmed(): + for rt in (RelationType.SUPPORTS, RelationType.EQUIVALENT, RelationType.BROADER_THAN, RelationType.RELATED_TO): + a = evaluate_relation(_rel(rt, EvidenceKind.CERTIFICATION), DEFAULT_POLICY) + assert a.status != AssertionStatus.CONFIRMED + # only a concrete artifact via a CONFIRMS relation reaches CONFIRMED + assert evaluate_relation(_rel(RelationType.CONFIRMS, EvidenceKind.ARTIFACT)).status == AssertionStatus.CONFIRMED + # the shipped policy structurally satisfies the hard rule + assert_no_certification_confirms(DEFAULT_POLICY) + + +# 8b. a policy that maps a certification to CONFIRMED is rejected. +def test_bad_policy_rejected(): + bad = PolicyVersion( + policy_version="bad", + rules=[PolicyRule( + relationship_type=RelationType.EQUIVALENT, evidence_kind=EvidenceKind.CERTIFICATION, + status=AssertionStatus.CONFIRMED, confidence=Confidence.HIGH, + )], + ) + with pytest.raises(ValueError): + assert_no_certification_confirms(bad) + + +# 6. provenance on a curated relation atom. +def test_relation_carries_provenance(): + rel = CapabilityRelation( + relation_id="r1", source="certification:ISO27001", target_capability_id="MCAP-00001", + relationship_type=RelationType.SUPPORTS, evidence_kind=EvidenceKind.CERTIFICATION, + ) + assert rel.relationship_type == RelationType.SUPPORTS + assert hasattr(rel, "provenance") + + +# 9. merge keeps a redirect: resolve(old) follows it to the new capability. +def test_merge_keeps_redirect(): + reg = CapabilityRegistry() + old = mint_capability(reg, CapabilityCandidate(raw_term="Update Management")) + new = mint_capability(reg, CapabilityCandidate(raw_term="Software Update Management")) + event = merge_capabilities(reg, old.capability_id, new.capability_id) + assert event.event_type == LifecycleEventType.MERGE + assert reg.capabilities[old.capability_id].redirect_to == new.capability_id + # resolve follows the redirect to the canonical capability + assert resolve(reg, old.capability_id).capability_id == new.capability_id + assert reg.lifecycle_events[-1].from_ids == [old.capability_id] + + +# 9b. deprecate without a redirect resolves to None (no canonical target). +def test_deprecate_without_redirect_resolves_none(): + reg = CapabilityRegistry() + cap = mint_capability(reg, CapabilityCandidate(raw_term="Legacy Capability")) + deprecate_capability(reg, cap.capability_id) + assert resolve(reg, cap.capability_id) is None + assert reg.capabilities[cap.capability_id].state.value == "deprecated" + + +# requires = an obligation NEEDS a capability (relevance), not possession -> unknown. +def test_requires_is_relevance_not_possession(): + a = evaluate_relation(_rel(RelationType.REQUIRES, EvidenceKind.NONE), DEFAULT_POLICY) + assert a.status == AssertionStatus.UNKNOWN