feat(capability): Master Capability Registry v0 (Phase 2C, Compliance Execution domain)

Third instance of the identity-machine pattern (after Master Controls and Master
Obligations). New compliance/capability/ package: MasterCapability with stable MCAP
ids, CapabilityCandidate minting, seven typed relation types, a VERSIONED derivation
policy, and identity lifecycle (merge/split/deprecate/redirect with provenance).

Stored: identities, sources, relationship types, policy versions, lifecycle events,
provenance. Derived (never stored): confidence/status via evaluate_relation under a
policy version. Hard rule (structurally guarded): a certification alone can never
yield CONFIRMED — only CONFIRMS + concrete artifact (or expert) does.

Built from the Reasoning session per user directive but this IS the Compliance
Execution model (Execution owns Capability) — handed off via the board. Metadata-first:
CapabilityRelation is registry metadata, NOT a new meta-model class (freeze v1.0
untouched). No Company-Gap, no real ISO/cert mappings, no UI/RAG, no generic
canonicalization engine. 11 tests; mypy --strict clean; LOC ok.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-26 21:35:12 +02:00
parent 7eb7f61483
commit 6ccc6c87c1
5 changed files with 629 additions and 0 deletions
@@ -0,0 +1,70 @@
"""Master Capability Registry v0 (Phase 2C) — Compliance Execution domain.
Registry + minting layer for Master Capabilities — the third instance of the
identity-machine pattern (Master Controls, Master Obligations, Master Capabilities).
STORED: identities, sources, relationship types, policy versions, lifecycle events,
provenance. DERIVED (never stored): confidence, coverage, gap.
v0 scope: types + minting + typed relations + versioned policy + identity lifecycle.
NOT here: Company-Gap, real ISO/cert mappings, certification derivations, UI, RAG,
new meta-model class, generic canonicalization engine.
"""
from __future__ import annotations
from .engine import (
CapabilityRegistry,
deprecate_capability,
evaluate_relation,
merge_capabilities,
mint_capability,
resolve,
split_capability,
)
from .policy import DEFAULT_POLICY, assert_no_certification_confirms
from .schemas import (
AssertionStatus,
CapabilityCandidate,
CapabilityRelation,
Confidence,
DerivedAssessment,
EvidenceKind,
IdentityLifecycleEvent,
LifecycleEventType,
LifecycleState,
MasterCapability,
PolicyRule,
PolicyVersion,
Provenance,
RelationType,
)
__all__ = [
# engine
"CapabilityRegistry",
"mint_capability",
"evaluate_relation",
"resolve",
"deprecate_capability",
"merge_capabilities",
"split_capability",
# policy
"DEFAULT_POLICY",
"assert_no_certification_confirms",
# schemas
"MasterCapability",
"CapabilityCandidate",
"CapabilityRelation",
"RelationType",
"EvidenceKind",
"AssertionStatus",
"Confidence",
"PolicyRule",
"PolicyVersion",
"IdentityLifecycleEvent",
"LifecycleEventType",
"LifecycleState",
"Provenance",
"DerivedAssessment",
]
@@ -0,0 +1,191 @@
"""Master Capability Registry v0 — minting, derivation, identity lifecycle.
STORED on the registry: identities, sources, relation types, policy versions,
lifecycle events, provenance. DERIVED (never stored): confidence/status, via
`evaluate_relation` under a versioned policy.
Python 3.9 compatible (no `|` unions).
"""
from __future__ import annotations
from typing import Dict, List, Optional, Set
from pydantic import BaseModel, Field
from .policy import DEFAULT_POLICY
from .schemas import (
AssertionStatus,
CapabilityCandidate,
CapabilityRelation,
Confidence,
DerivedAssessment,
IdentityLifecycleEvent,
LifecycleEventType,
LifecycleState,
MasterCapability,
PolicyVersion,
Provenance,
)
class CapabilityRegistry(BaseModel):
# NOTE: no confidence/coverage field anywhere — those are DERIVED, never stored.
capabilities: Dict[str, MasterCapability] = Field(default_factory=dict)
relations: List[CapabilityRelation] = Field(default_factory=list)
lifecycle_events: List[IdentityLifecycleEvent] = Field(default_factory=list)
policy: PolicyVersion = Field(default_factory=lambda: DEFAULT_POLICY)
next_serial: int = 1
def _mcap_id(serial: int) -> str:
return "MCAP-%05d" % serial
def _next_event_id(registry: "CapabilityRegistry") -> str:
return "evt-%d" % (len(registry.lifecycle_events) + 1)
def mint_capability(
registry: CapabilityRegistry,
candidate: CapabilityCandidate,
provenance: Optional[Provenance] = None,
name: str = "",
definition: str = "",
category: str = "",
domains: Optional[List[str]] = None,
) -> MasterCapability:
"""Assign the next stable MCAP id to a candidate and register it (with provenance)."""
cap_id = _mcap_id(registry.next_serial)
cap = MasterCapability(
capability_id=cap_id,
name=name or candidate.normalized or candidate.raw_term,
definition=definition,
category=category,
domains=domains or [],
provenance=provenance
or Provenance(author="system", basis="minted from candidate '%s'" % candidate.raw_term),
)
registry.capabilities[cap_id] = cap
registry.next_serial += 1
return cap
def evaluate_relation(
relation: CapabilityRelation, policy: Optional[PolicyVersion] = None
) -> DerivedAssessment:
"""Derive (status, confidence) from (relationship_type, evidence_kind) under a
versioned policy. Deterministic; result is returned, never stored."""
pol = policy if policy is not None else DEFAULT_POLICY
status = AssertionStatus.UNKNOWN
confidence = Confidence.LOW
found = False
for rule in pol.rules:
if (
rule.relationship_type == relation.relationship_type
and rule.evidence_kind == relation.evidence_kind
):
status, confidence, found = rule.status, rule.confidence, True
break
expl = "%s + %s under %s -> %s/%s%s" % (
relation.relationship_type.value,
relation.evidence_kind.value,
pol.policy_version,
status.value,
confidence.value,
"" if found else " (no rule)",
)
return DerivedAssessment(
target_capability_id=relation.target_capability_id,
status=status,
confidence=confidence,
policy_version=pol.policy_version,
explanation=expl,
)
def resolve(
registry: CapabilityRegistry, capability_id: str, _seen: Optional[Set[str]] = None
) -> Optional[MasterCapability]:
"""Follow redirects (from merge/deprecate) to the current canonical capability."""
seen = _seen if _seen is not None else set()
if capability_id in seen:
return None # redirect cycle guard
seen.add(capability_id)
cap = registry.capabilities.get(capability_id)
if cap is None:
return None
if cap.redirect_to:
return resolve(registry, cap.redirect_to, seen)
# terminal: only an ACTIVE capability resolves; a deprecated dead-end -> None
return cap if cap.state == LifecycleState.ACTIVE else None
def deprecate_capability(
registry: CapabilityRegistry,
capability_id: str,
redirect_to: Optional[str] = None,
provenance: Optional[Provenance] = None,
) -> IdentityLifecycleEvent:
cap = registry.capabilities.get(capability_id)
if cap is None:
raise KeyError(capability_id)
cap.state = LifecycleState.DEPRECATED
cap.redirect_to = redirect_to
event = IdentityLifecycleEvent(
event_id=_next_event_id(registry),
event_type=LifecycleEventType.REDIRECT if redirect_to else LifecycleEventType.DEPRECATE,
from_ids=[capability_id],
to_ids=[redirect_to] if redirect_to else [],
provenance=provenance or Provenance(author="system", basis="deprecate %s" % capability_id),
)
registry.lifecycle_events.append(event)
return event
def merge_capabilities(
registry: CapabilityRegistry,
from_id: str,
into_id: str,
provenance: Optional[Provenance] = None,
) -> IdentityLifecycleEvent:
"""Merge `from_id` into `into_id`: deprecate `from_id` with a redirect to `into_id`."""
if from_id not in registry.capabilities or into_id not in registry.capabilities:
raise KeyError("%s or %s" % (from_id, into_id))
frm = registry.capabilities[from_id]
frm.state = LifecycleState.DEPRECATED
frm.redirect_to = into_id
event = IdentityLifecycleEvent(
event_id=_next_event_id(registry),
event_type=LifecycleEventType.MERGE,
from_ids=[from_id],
to_ids=[into_id],
provenance=provenance or Provenance(author="system", basis="merge %s -> %s" % (from_id, into_id)),
)
registry.lifecycle_events.append(event)
return event
def split_capability(
registry: CapabilityRegistry,
from_id: str,
into_ids: List[str],
primary: Optional[str] = None,
provenance: Optional[Provenance] = None,
) -> IdentityLifecycleEvent:
"""Split `from_id` into several capabilities. The old id deprecates; it redirects
to `primary` only if one is given (else it resolves to None — split is ambiguous)."""
if from_id not in registry.capabilities:
raise KeyError(from_id)
frm = registry.capabilities[from_id]
frm.state = LifecycleState.DEPRECATED
frm.redirect_to = primary
event = IdentityLifecycleEvent(
event_id=_next_event_id(registry),
event_type=LifecycleEventType.SPLIT,
from_ids=[from_id],
to_ids=list(into_ids),
provenance=provenance or Provenance(author="system", basis="split %s" % from_id),
)
registry.lifecycle_events.append(event)
return event
@@ -0,0 +1,65 @@
"""Derivation policy v0 for the Master Capability Registry.
Confidence + status are DERIVED from (relationship_type, evidence_kind) under a
versioned policy — never stored. HARD RULE baked in and structurally guarded: a
CERTIFICATION is a claim, never proof — no certification-backed rule may yield
CONFIRMED. CONFIRMED requires a CONFIRMS relation backed by a concrete ARTIFACT
(or an EXPERT assertion).
Python 3.9 compatible (no `|` unions).
"""
from __future__ import annotations
from .schemas import (
AssertionStatus,
Confidence,
EvidenceKind,
PolicyRule,
PolicyVersion,
RelationType,
)
def _rule(rt: RelationType, ek: EvidenceKind, st: AssertionStatus, cf: Confidence) -> PolicyRule:
return PolicyRule(relationship_type=rt, evidence_kind=ek, status=st, confidence=cf)
# (relationship_type, evidence_kind) -> (status, confidence)
_V0_RULES = [
# concrete artifact / expert confirming the capability -> CONFIRMED
_rule(RelationType.CONFIRMS, EvidenceKind.ARTIFACT, AssertionStatus.CONFIRMED, Confidence.HIGH),
_rule(RelationType.CONFIRMS, EvidenceKind.EXPERT, AssertionStatus.CONFIRMED, Confidence.MEDIUM),
# equivalent capability — certificate or artifact behind it -> INFERRED (never confirmed)
_rule(RelationType.EQUIVALENT, EvidenceKind.CERTIFICATION, AssertionStatus.INFERRED, Confidence.HIGH),
_rule(RelationType.EQUIVALENT, EvidenceKind.ARTIFACT, AssertionStatus.INFERRED, Confidence.HIGH),
# supports — weaker
_rule(RelationType.SUPPORTS, EvidenceKind.CERTIFICATION, AssertionStatus.INFERRED, Confidence.LOW),
_rule(RelationType.SUPPORTS, EvidenceKind.ARTIFACT, AssertionStatus.INFERRED, Confidence.MEDIUM),
# requires = an obligation NEEDS the capability (relevance, not possession)
_rule(RelationType.REQUIRES, EvidenceKind.NONE, AssertionStatus.UNKNOWN, Confidence.LOW),
# broader/narrower certificate -> weak inference
_rule(RelationType.BROADER_THAN, EvidenceKind.CERTIFICATION, AssertionStatus.INFERRED, Confidence.LOW),
_rule(RelationType.NARROWER_THAN, EvidenceKind.CERTIFICATION, AssertionStatus.INFERRED, Confidence.LOW),
_rule(RelationType.RELATED_TO, EvidenceKind.CERTIFICATION, AssertionStatus.UNKNOWN, Confidence.LOW),
]
DEFAULT_POLICY = PolicyVersion(
policy_version="capability-policy-v0",
description="v0: certification never yields CONFIRMED; only CONFIRMS + ARTIFACT/EXPERT does.",
rules=_V0_RULES,
)
def assert_no_certification_confirms(policy: PolicyVersion) -> None:
"""Structural guard for the hard rule: no CERTIFICATION-backed rule is CONFIRMED."""
for r in policy.rules:
if r.evidence_kind == EvidenceKind.CERTIFICATION and r.status == AssertionStatus.CONFIRMED:
raise ValueError(
"policy %s violates hard rule: certification -> confirmed (%s)"
% (policy.policy_version, r.relationship_type.value)
)
# fail fast at import: the shipped default must satisfy the hard rule
assert_no_certification_confirms(DEFAULT_POLICY)
@@ -0,0 +1,150 @@
"""Master Capability Registry v0 — Compliance Execution domain (Phase 2C).
Built from the Reasoning session per user directive, but this IS the Compliance
Execution model (Execution owns Capability). Third real instance of the
identity-machine pattern (after Master Controls and Master Obligations):
Candidate -> Normalization -> Dedup -> Stable Identity (MCAP) -> Typed Relations
KEY SENTENCE (stored vs derived):
STORED : identities, sources, relationship types, policy versions, lifecycle
events, provenance.
DERIVED : confidence, coverage and gap statements — computed on demand, NEVER
stored (see policy.py / engine.evaluate_relation).
These are APPLICATION/registry types, NOT compliance-meta-model classes. In
particular `CapabilityRelation` is relation METADATA inside the registry — it does
NOT introduce a new meta-model class. Whether a reified relation must enter the
frozen meta-model is a Meta-Model-Owner decision (architecture freeze v1.0),
deferred until a demonstrable failure case exists.
Self-contained (no Reasoning import — Reasoning consumes Capability, not the other
way round). Python 3.9 compatible (no `|` unions).
"""
from __future__ import annotations
from enum import Enum
from typing import List, Optional
from pydantic import BaseModel, Field
class Confidence(str, Enum):
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
class AssertionStatus(str, Enum):
"""How well-established a capability claim is. A numeric score is presentation;
THIS type is the truth (derived from relationship type + evidence + policy)."""
DECLARED = "declared"
INFERRED = "inferred"
CONFIRMED = "confirmed"
UNKNOWN = "unknown"
class RelationType(str, Enum):
EQUIVALENT = "equivalent"
SUPPORTS = "supports"
REQUIRES = "requires"
CONFIRMS = "confirms"
BROADER_THAN = "broader_than"
NARROWER_THAN = "narrower_than"
RELATED_TO = "related_to"
class EvidenceKind(str, Enum):
CERTIFICATION = "certification" # a held certificate — a CLAIM, never proof
ARTIFACT = "artifact" # concrete doc/config/test/log
EXPERT = "expert" # human expert assertion
NONE = "none"
class LifecycleState(str, Enum):
ACTIVE = "active"
DEPRECATED = "deprecated"
class LifecycleEventType(str, Enum):
MERGE = "merge"
SPLIT = "split"
DEPRECATE = "deprecate"
REDIRECT = "redirect"
class Provenance(BaseModel):
"""Every CURATED atom carries its own provenance (who / when / on what basis)."""
author: str = ""
asserted_at: Optional[str] = None # ISO timestamp passed in; never generated here
basis: str = ""
# ── stored: identity ──────────────────────────────────────────────────────
class MasterCapability(BaseModel):
capability_id: str # stable MCAP-xxxxx
name: str = ""
definition: str = ""
category: str = ""
domains: List[str] = Field(default_factory=list)
typical_evidence: List[str] = Field(default_factory=list)
version: int = 1
state: LifecycleState = LifecycleState.ACTIVE
redirect_to: Optional[str] = None # set on merge/deprecate
provenance: Provenance = Field(default_factory=Provenance)
class CapabilityCandidate(BaseModel):
raw_term: str # e.g. "Patch Management"
source: str = "" # e.g. "CRA:Annex I (2)(d)"
normalized: str = ""
# ── stored: typed relation metadata (NOT a meta-model class) ──────────────
class CapabilityRelation(BaseModel):
relation_id: str
source: str # external term/obligation/certification id, e.g. "certification:ISO27001"
target_capability_id: str # MCAP-...
relationship_type: RelationType
evidence_kind: EvidenceKind = EvidenceKind.NONE
provenance: Provenance = Field(default_factory=Provenance)
# ── stored: versioned derivation policy ───────────────────────────────────
class PolicyRule(BaseModel):
relationship_type: RelationType
evidence_kind: EvidenceKind
status: AssertionStatus
confidence: Confidence
class PolicyVersion(BaseModel):
"""A versioned derivation policy. `policy_version` is recorded with every
assessment so "why did you say X last year" is answerable with the policy
as-of-then. Without this, `derived` and `auditable/reproducible` contradict."""
policy_version: str
description: str = ""
rules: List[PolicyRule] = Field(default_factory=list)
# ── stored: identity lifecycle ────────────────────────────────────────────
class IdentityLifecycleEvent(BaseModel):
event_id: str
event_type: LifecycleEventType
from_ids: List[str] = Field(default_factory=list)
to_ids: List[str] = Field(default_factory=list)
at: Optional[str] = None
provenance: Provenance = Field(default_factory=Provenance)
# ── DERIVED — never stored ────────────────────────────────────────────────
class DerivedAssessment(BaseModel):
target_capability_id: str
status: AssertionStatus
confidence: Confidence
policy_version: str
explanation: str = ""
+153
View File
@@ -0,0 +1,153 @@
"""Tests for Master Capability Registry v0 (Phase 2C, Compliance Execution domain).
Acceptance: a registry mints stable MCAP ids, stores typed relations + versioned
policy + lifecycle events + provenance, and DERIVES confidence/status on demand
(never stored). Hard rule: a certification alone can never yield CONFIRMED.
No real ISO/cert mappings here — only synthetic relations (mappings are not part of
v0; they are Execution data, injected later).
"""
from __future__ import annotations
import pytest
from compliance.capability import (
DEFAULT_POLICY,
AssertionStatus,
CapabilityCandidate,
CapabilityRegistry,
CapabilityRelation,
Confidence,
EvidenceKind,
LifecycleEventType,
PolicyRule,
PolicyVersion,
RelationType,
assert_no_certification_confirms,
deprecate_capability,
evaluate_relation,
merge_capabilities,
mint_capability,
resolve,
)
def _rel(rt, ek, target="MCAP-00001", src="x"):
return CapabilityRelation(
relation_id="r", source=src, target_capability_id=target, relationship_type=rt, evidence_kind=ek
)
# 1. minting assigns stable, incrementing MCAP ids.
def test_mint_stable_mcap_id():
reg = CapabilityRegistry()
a = mint_capability(reg, CapabilityCandidate(raw_term="Patch Management"))
b = mint_capability(reg, CapabilityCandidate(raw_term="Incident Response"))
assert a.capability_id == "MCAP-00001"
assert b.capability_id == "MCAP-00002"
assert reg.capabilities["MCAP-00001"].name == "Patch Management"
# 2. minted capability carries provenance.
def test_mint_records_provenance():
reg = CapabilityRegistry()
cap = mint_capability(reg, CapabilityCandidate(raw_term="Secure Development"))
assert cap.provenance.author == "system"
assert "Secure Development" in cap.provenance.basis
# 3. all seven relationship types exist.
def test_relation_types_complete():
assert {t.value for t in RelationType} == {
"equivalent", "supports", "requires", "confirms", "broader_than", "narrower_than", "related_to",
}
# 4. confidence is COMPUTED from relation_type + evidence + policy_version, not stored.
def test_confidence_computed_and_policy_referenced():
rel = _rel(RelationType.SUPPORTS, EvidenceKind.CERTIFICATION)
a = evaluate_relation(rel, DEFAULT_POLICY)
assert a.status == AssertionStatus.INFERRED and a.confidence == Confidence.LOW
assert a.policy_version == "capability-policy-v0"
# the registry itself stores NO confidence/coverage
assert "confidence" not in CapabilityRegistry.model_fields
assert "coverage" not in CapabilityRegistry.model_fields
# 4b. a DIFFERENT policy version yields a different result for the SAME relation
# -> "why did you say X last year" needs the policy-as-of-then.
def test_policy_versioning_changes_outcome():
rel = _rel(RelationType.SUPPORTS, EvidenceKind.CERTIFICATION)
v0b = PolicyVersion(
policy_version="capability-policy-v0b",
rules=[PolicyRule(
relationship_type=RelationType.SUPPORTS, evidence_kind=EvidenceKind.CERTIFICATION,
status=AssertionStatus.INFERRED, confidence=Confidence.MEDIUM,
)],
)
assert evaluate_relation(rel, DEFAULT_POLICY).confidence == Confidence.LOW
assert evaluate_relation(rel, v0b).confidence == Confidence.MEDIUM
assert evaluate_relation(rel, v0b).policy_version == "capability-policy-v0b"
# 5 + 8. HARD RULE: a certification alone can NEVER be CONFIRMED.
def test_certification_never_confirmed():
for rt in (RelationType.SUPPORTS, RelationType.EQUIVALENT, RelationType.BROADER_THAN, RelationType.RELATED_TO):
a = evaluate_relation(_rel(rt, EvidenceKind.CERTIFICATION), DEFAULT_POLICY)
assert a.status != AssertionStatus.CONFIRMED
# only a concrete artifact via a CONFIRMS relation reaches CONFIRMED
assert evaluate_relation(_rel(RelationType.CONFIRMS, EvidenceKind.ARTIFACT)).status == AssertionStatus.CONFIRMED
# the shipped policy structurally satisfies the hard rule
assert_no_certification_confirms(DEFAULT_POLICY)
# 8b. a policy that maps a certification to CONFIRMED is rejected.
def test_bad_policy_rejected():
bad = PolicyVersion(
policy_version="bad",
rules=[PolicyRule(
relationship_type=RelationType.EQUIVALENT, evidence_kind=EvidenceKind.CERTIFICATION,
status=AssertionStatus.CONFIRMED, confidence=Confidence.HIGH,
)],
)
with pytest.raises(ValueError):
assert_no_certification_confirms(bad)
# 6. provenance on a curated relation atom.
def test_relation_carries_provenance():
rel = CapabilityRelation(
relation_id="r1", source="certification:ISO27001", target_capability_id="MCAP-00001",
relationship_type=RelationType.SUPPORTS, evidence_kind=EvidenceKind.CERTIFICATION,
)
assert rel.relationship_type == RelationType.SUPPORTS
assert hasattr(rel, "provenance")
# 9. merge keeps a redirect: resolve(old) follows it to the new capability.
def test_merge_keeps_redirect():
reg = CapabilityRegistry()
old = mint_capability(reg, CapabilityCandidate(raw_term="Update Management"))
new = mint_capability(reg, CapabilityCandidate(raw_term="Software Update Management"))
event = merge_capabilities(reg, old.capability_id, new.capability_id)
assert event.event_type == LifecycleEventType.MERGE
assert reg.capabilities[old.capability_id].redirect_to == new.capability_id
# resolve follows the redirect to the canonical capability
assert resolve(reg, old.capability_id).capability_id == new.capability_id
assert reg.lifecycle_events[-1].from_ids == [old.capability_id]
# 9b. deprecate without a redirect resolves to None (no canonical target).
def test_deprecate_without_redirect_resolves_none():
reg = CapabilityRegistry()
cap = mint_capability(reg, CapabilityCandidate(raw_term="Legacy Capability"))
deprecate_capability(reg, cap.capability_id)
assert resolve(reg, cap.capability_id) is None
assert reg.capabilities[cap.capability_id].state.value == "deprecated"
# requires = an obligation NEEDS a capability (relevance), not possession -> unknown.
def test_requires_is_relevance_not_possession():
a = evaluate_relation(_rel(RelationType.REQUIRES, EvidenceKind.NONE), DEFAULT_POLICY)
assert a.status == AssertionStatus.UNKNOWN