6ccc6c87c1
Third instance of the identity-machine pattern (after Master Controls and Master Obligations). New compliance/capability/ package: MasterCapability with stable MCAP ids, CapabilityCandidate minting, seven typed relation types, a VERSIONED derivation policy, and identity lifecycle (merge/split/deprecate/redirect with provenance). Stored: identities, sources, relationship types, policy versions, lifecycle events, provenance. Derived (never stored): confidence/status via evaluate_relation under a policy version. Hard rule (structurally guarded): a certification alone can never yield CONFIRMED — only CONFIRMS + concrete artifact (or expert) does. Built from the Reasoning session per user directive but this IS the Compliance Execution model (Execution owns Capability) — handed off via the board. Metadata-first: CapabilityRelation is registry metadata, NOT a new meta-model class (freeze v1.0 untouched). No Company-Gap, no real ISO/cert mappings, no UI/RAG, no generic canonicalization engine. 11 tests; mypy --strict clean; LOC ok. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
192 lines
6.5 KiB
Python
192 lines
6.5 KiB
Python
"""Master Capability Registry v0 — minting, derivation, identity lifecycle.
|
|
|
|
STORED on the registry: identities, sources, relation types, policy versions,
|
|
lifecycle events, provenance. DERIVED (never stored): confidence/status, via
|
|
`evaluate_relation` under a versioned policy.
|
|
|
|
Python 3.9 compatible (no `|` unions).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Dict, List, Optional, Set
|
|
|
|
from pydantic import BaseModel, Field
|
|
|
|
from .policy import DEFAULT_POLICY
|
|
from .schemas import (
|
|
AssertionStatus,
|
|
CapabilityCandidate,
|
|
CapabilityRelation,
|
|
Confidence,
|
|
DerivedAssessment,
|
|
IdentityLifecycleEvent,
|
|
LifecycleEventType,
|
|
LifecycleState,
|
|
MasterCapability,
|
|
PolicyVersion,
|
|
Provenance,
|
|
)
|
|
|
|
|
|
class CapabilityRegistry(BaseModel):
|
|
# NOTE: no confidence/coverage field anywhere — those are DERIVED, never stored.
|
|
capabilities: Dict[str, MasterCapability] = Field(default_factory=dict)
|
|
relations: List[CapabilityRelation] = Field(default_factory=list)
|
|
lifecycle_events: List[IdentityLifecycleEvent] = Field(default_factory=list)
|
|
policy: PolicyVersion = Field(default_factory=lambda: DEFAULT_POLICY)
|
|
next_serial: int = 1
|
|
|
|
|
|
def _mcap_id(serial: int) -> str:
|
|
return "MCAP-%05d" % serial
|
|
|
|
|
|
def _next_event_id(registry: "CapabilityRegistry") -> str:
|
|
return "evt-%d" % (len(registry.lifecycle_events) + 1)
|
|
|
|
|
|
def mint_capability(
|
|
registry: CapabilityRegistry,
|
|
candidate: CapabilityCandidate,
|
|
provenance: Optional[Provenance] = None,
|
|
name: str = "",
|
|
definition: str = "",
|
|
category: str = "",
|
|
domains: Optional[List[str]] = None,
|
|
) -> MasterCapability:
|
|
"""Assign the next stable MCAP id to a candidate and register it (with provenance)."""
|
|
cap_id = _mcap_id(registry.next_serial)
|
|
cap = MasterCapability(
|
|
capability_id=cap_id,
|
|
name=name or candidate.normalized or candidate.raw_term,
|
|
definition=definition,
|
|
category=category,
|
|
domains=domains or [],
|
|
provenance=provenance
|
|
or Provenance(author="system", basis="minted from candidate '%s'" % candidate.raw_term),
|
|
)
|
|
registry.capabilities[cap_id] = cap
|
|
registry.next_serial += 1
|
|
return cap
|
|
|
|
|
|
def evaluate_relation(
|
|
relation: CapabilityRelation, policy: Optional[PolicyVersion] = None
|
|
) -> DerivedAssessment:
|
|
"""Derive (status, confidence) from (relationship_type, evidence_kind) under a
|
|
versioned policy. Deterministic; result is returned, never stored."""
|
|
pol = policy if policy is not None else DEFAULT_POLICY
|
|
status = AssertionStatus.UNKNOWN
|
|
confidence = Confidence.LOW
|
|
found = False
|
|
for rule in pol.rules:
|
|
if (
|
|
rule.relationship_type == relation.relationship_type
|
|
and rule.evidence_kind == relation.evidence_kind
|
|
):
|
|
status, confidence, found = rule.status, rule.confidence, True
|
|
break
|
|
expl = "%s + %s under %s -> %s/%s%s" % (
|
|
relation.relationship_type.value,
|
|
relation.evidence_kind.value,
|
|
pol.policy_version,
|
|
status.value,
|
|
confidence.value,
|
|
"" if found else " (no rule)",
|
|
)
|
|
return DerivedAssessment(
|
|
target_capability_id=relation.target_capability_id,
|
|
status=status,
|
|
confidence=confidence,
|
|
policy_version=pol.policy_version,
|
|
explanation=expl,
|
|
)
|
|
|
|
|
|
def resolve(
|
|
registry: CapabilityRegistry, capability_id: str, _seen: Optional[Set[str]] = None
|
|
) -> Optional[MasterCapability]:
|
|
"""Follow redirects (from merge/deprecate) to the current canonical capability."""
|
|
seen = _seen if _seen is not None else set()
|
|
if capability_id in seen:
|
|
return None # redirect cycle guard
|
|
seen.add(capability_id)
|
|
cap = registry.capabilities.get(capability_id)
|
|
if cap is None:
|
|
return None
|
|
if cap.redirect_to:
|
|
return resolve(registry, cap.redirect_to, seen)
|
|
# terminal: only an ACTIVE capability resolves; a deprecated dead-end -> None
|
|
return cap if cap.state == LifecycleState.ACTIVE else None
|
|
|
|
|
|
def deprecate_capability(
|
|
registry: CapabilityRegistry,
|
|
capability_id: str,
|
|
redirect_to: Optional[str] = None,
|
|
provenance: Optional[Provenance] = None,
|
|
) -> IdentityLifecycleEvent:
|
|
cap = registry.capabilities.get(capability_id)
|
|
if cap is None:
|
|
raise KeyError(capability_id)
|
|
cap.state = LifecycleState.DEPRECATED
|
|
cap.redirect_to = redirect_to
|
|
event = IdentityLifecycleEvent(
|
|
event_id=_next_event_id(registry),
|
|
event_type=LifecycleEventType.REDIRECT if redirect_to else LifecycleEventType.DEPRECATE,
|
|
from_ids=[capability_id],
|
|
to_ids=[redirect_to] if redirect_to else [],
|
|
provenance=provenance or Provenance(author="system", basis="deprecate %s" % capability_id),
|
|
)
|
|
registry.lifecycle_events.append(event)
|
|
return event
|
|
|
|
|
|
def merge_capabilities(
|
|
registry: CapabilityRegistry,
|
|
from_id: str,
|
|
into_id: str,
|
|
provenance: Optional[Provenance] = None,
|
|
) -> IdentityLifecycleEvent:
|
|
"""Merge `from_id` into `into_id`: deprecate `from_id` with a redirect to `into_id`."""
|
|
if from_id not in registry.capabilities or into_id not in registry.capabilities:
|
|
raise KeyError("%s or %s" % (from_id, into_id))
|
|
frm = registry.capabilities[from_id]
|
|
frm.state = LifecycleState.DEPRECATED
|
|
frm.redirect_to = into_id
|
|
event = IdentityLifecycleEvent(
|
|
event_id=_next_event_id(registry),
|
|
event_type=LifecycleEventType.MERGE,
|
|
from_ids=[from_id],
|
|
to_ids=[into_id],
|
|
provenance=provenance or Provenance(author="system", basis="merge %s -> %s" % (from_id, into_id)),
|
|
)
|
|
registry.lifecycle_events.append(event)
|
|
return event
|
|
|
|
|
|
def split_capability(
|
|
registry: CapabilityRegistry,
|
|
from_id: str,
|
|
into_ids: List[str],
|
|
primary: Optional[str] = None,
|
|
provenance: Optional[Provenance] = None,
|
|
) -> IdentityLifecycleEvent:
|
|
"""Split `from_id` into several capabilities. The old id deprecates; it redirects
|
|
to `primary` only if one is given (else it resolves to None — split is ambiguous)."""
|
|
if from_id not in registry.capabilities:
|
|
raise KeyError(from_id)
|
|
frm = registry.capabilities[from_id]
|
|
frm.state = LifecycleState.DEPRECATED
|
|
frm.redirect_to = primary
|
|
event = IdentityLifecycleEvent(
|
|
event_id=_next_event_id(registry),
|
|
event_type=LifecycleEventType.SPLIT,
|
|
from_ids=[from_id],
|
|
to_ids=list(into_ids),
|
|
provenance=provenance or Provenance(author="system", basis="split %s" % from_id),
|
|
)
|
|
registry.lifecycle_events.append(event)
|
|
return event
|