"""Core domain models for ContextEngine.
Phase 0 frozen — these dataclasses define the shape of all domain objects.
All models use stdlib dataclass for minimal dependencies.
Retrieval-chain models (SeedHit → SearchMemoryResult) added for the
search_memory pipeline; they follow the same conventions.
"""
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any
import hashlib
import uuid
class Role(str, Enum):
"""Minimal RBAC roles for multi-tenant control plane."""
ROOT = "ROOT"
ADMIN = "ADMIN"
MEMBER = "MEMBER"
def user_space_name(user_id: str) -> str:
"""Generate owner_space identifier for a user.
Uses colon format for consistency across the codebase.
Format: "user:{user_id}"
Args:
user_id: User identifier
Returns:
Owner space string in format "user:{user_id}"
"""
return f"user:{user_id}"
def agent_space_name(agent_id: str) -> str:
"""Generate owner_space identifier for an agent.
Uses colon format for consistency across the codebase.
Format: "agent:{agent_id}"
Args:
agent_id: Agent identifier
Returns:
Owner space string in format "agent:{agent_id}"
"""
return f"agent:{agent_id}"
@dataclass(frozen=True)
class RequestContext:
"""Execution context for all operations.
Enforces multi-tenant isolation by carrying account_id and owner_space.
Passed to all ContextFS and RelationStore methods for access control.
"""
account_id: str
user_id: str
agent_id: str
session_id: str
trace_id: str
role: Role | str = Role.MEMBER
visible_owner_spaces: tuple[str, ...] = ()
def user_space_name(self) -> str:
"""Generate owner_space identifier for user scope.
Returns owner_space in format 'user:{user_id}'.
"""
return f"user:{self.user_id}"
def agent_space_name(self) -> str:
"""Generate owner_space identifier for agent scope.
Returns owner_space in format 'agent:{agent_id}'.
"""
return f"agent:{self.agent_id}"
@dataclass
class ContextNode:
"""A node in the context knowledge graph.
Persisted to AGFS as a directory with content.md, .relations.json,
.abstract.md, .overview.md, .meta.json files.
See CLAUDE.md §2 AGFS Directory Spec for file layout.
See CLAUDE.md §1 URI Spec for URI patterns by category.
"""
uri: str
context_type: str
category: str
level: int
owner_space: str
abstract: str
overview: str
content: str
metadata: dict[str, Any] = field(default_factory=dict)
@property
def parent_uri(self) -> str:
"""Get parent URI by removing the last path segment.
For example:
- "ctx://acme/users/alice/memories/preferences/coding_style"
→ "ctx://acme/users/alice/memories/preferences/"
- "ctx://acme/users/alice/memories/profile"
→ "ctx://acme/users/alice/memories/"
- "ctx://acme/users/alice/memories/preferences/"
→ "ctx://acme/users/alice/memories/"
"""
if "/" not in self.uri:
return ""
uri = self.uri
if uri.endswith("/"):
uri = uri.rstrip("/")
if "/" not in uri:
return ""
return uri.rsplit("/", 1)[0] + "/"
@dataclass
class RelationEdge:
"""A directed relationship between two ContextNodes.
Persisted in .relations.json at the source node.
Weight indicates confidence/strength (0.0-1.0).
"""
from_uri: str
to_uri: str
relation_type: str
weight: float
reason: str
@dataclass
class CandidateMemory:
"""Extracted candidate before merge policy decisions.
Produced by CandidateExtractor from message history.
Consumed by MergePolicy to produce WritePlan.
"""
category: str
owner_scope: str
routing_key: str
abstract: str
overview: str
content: str
confidence: float
when: str | None = None
who: str | None = None
where: str | None = None
tool_stats: dict | None = None
evidence_quote: str | None = None
attributed_speaker: str | None = None
attribution_basis: str | None = None
provenance_ids: list[str] = field(default_factory=list)
class WriteAction:
"""Constants for WritePlan action types."""
CREATE = "create"
MERGE = "merge"
ARCHIVE = "archive"
DELETE = "delete"
@dataclass
class WritePlan:
"""Output of MergePolicy — determines write action.
Action values:
- create: New node, does not exist
- merge: Existing node, merge content
- append: New version to existing node (events/cases)
- skip: Duplicate or below confidence threshold
"""
action: str
target_uri: str
merged_fields: dict[str, Any] = field(default_factory=dict)
relation_edges: list[RelationEdge] = field(default_factory=list)
@dataclass
class IndexRecord:
"""A record in the vector index.
Each ContextNode expands to 3 IndexRecords (L0/L1/L2).
The id is stable: sha256(uri + ":" + str(level))[:16].
CRITICAL: filters MUST contain account_id + owner_space.
Missing filters cause silent cross-tenant leakage (no runtime error).
Contract test enforces this invariant.
"""
id: str
uri: str
level: int
text: str
filters: dict[str, Any]
metadata: dict[str, Any] = field(default_factory=dict)
@staticmethod
def generate_id(uri: str, level: int) -> str:
"""Generate stable IndexRecord id.
Stable id enables idempotent upsert to vector index.
"""
return hashlib.sha256(f"{uri}:{level}".encode()).hexdigest()[:16]
@dataclass
class OutboxEvent:
"""Event for async index synchronization.
Written to .outbox/{event_id}.json, processed by OutboxWorker.
Events are durable and survive process restarts (stored in AGFS).
"""
event_id: str
event_type: str
uri: str
payload: dict[str, Any]
status: str
retry_count: int = 0
created_at: str = ""
next_retry_at: str = ""
def __post_init__(self):
if not self.created_at:
self.created_at = datetime.now(timezone.utc).isoformat()
@dataclass
class TypedQuery:
"""A query with type annotations for filtered retrieval.
account_id is injected by service layer, cannot be overridden by caller.
categories is inferred by Planner from query text.
target_uri allows direct lookup skipping vector recall.
intent is the retrieval-intent classification for Layer 3.
"""
text: str
context_type: str
categories: list[str]
target_uri: str | None = None
top_k: int = 10
account_id: str = ""
owner_space: str | list[str] | None = None
intent: str | None = None
class RetrieverMode(str, Enum):
"""Controls depth of scoring: QUICK skips reranker, THINKING uses it."""
THINKING = "thinking"
QUICK = "quick"
@dataclass
class SeedHit:
"""Raw result from VectorIndex.
Carries level and parent_uri needed for hierarchical expansion.
"""
uri: str
score: float
level: int = 2
parent_uri: str | None = None
context_type: str = ""
category: str = ""
owner_space: str = ""
abstract: str = ""
has_overview: bool = False
has_content: bool = False
active_count: int = 0
updated_at: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass
class SeedResult:
"""Output of SeedRetriever — two streams after level-based split."""
starting_points: list[SeedHit] = field(default_factory=list)
initial_candidates: list[SeedHit] = field(default_factory=list)
query_vector: list[float] = field(default_factory=list)
root_uris: list[str] = field(default_factory=list)
@dataclass
class LeafHit:
"""L2 node produced by HierarchicalSearcher after recursive expansion."""
uri: str
score: float
level: int = 2
depth: int = 0
parent_uri: str | None = None
category: str = ""
owner_space: str = ""
abstract: str = ""
has_overview: bool = False
has_content: bool = False
hierarchy_path: list[str] = field(default_factory=list)
active_count: int = 0
updated_at: str | None = None
@dataclass
class RetrievedBlock:
"""Final output block returned by search_memory or read_memory."""
uri: str
level_hit: str = "L2"
score: float = 0.0
category: str = ""
owner_space: str = ""
abstract: str | None = None
overview: str | None = None
content_excerpt: str | None = None
relations: list[RelationEdge] = field(default_factory=list)
has_overview: bool = False
has_content: bool = False
match_reason: str = ""
@dataclass
class SearchMemoryResult:
"""Top-level result of search_memory — pure structured data."""
request_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
query: str = ""
typed_queries: list[TypedQuery] = field(default_factory=list)
hits: list[RetrievedBlock] = field(default_factory=list)
trace: Any = None
@dataclass
class RetrievalConfig:
"""Tunables shared across all retrieval pipeline stages."""
max_top_k: int = 50
default_top_k: int = 10
global_search_topk: int = 30
max_convergence_rounds: int = 3
score_propagation_alpha: float = 0.5
hotness_alpha: float = 0.2
hotness_half_life_days: float = 7.0
default_score_threshold: float = 0.0
@dataclass
class BudgetTier:
"""A budget tier with priority and degradation support.
Tiers define how token budget is allocated across different memory components:
- priority: 0=highest (must keep), 10=lowest (first to drop)
- min_tokens: Floor — never go below this
- max_tokens: Ceiling — never exceed this
- degradable: Can truncate to min when budget tight
- expandable: Can grow beyond max when budget available
"""
name: str
priority: int
min_tokens: int
max_tokens: int
degradable: bool = True
expandable: bool = False
@dataclass
class TokenBudget:
"""Token budget configuration with priority-tiered allocation.
Tiers by priority:
0. Identity (profile, preferences) — must keep, never degrade
1. Session state (active task, constraints) — must keep, never degrade
5. Archive history — degradable (full → overview → abstract)
8. Working set (search results) — degradable AND expandable
Legacy properties (archive_ratio, session_state_ratio) preserved for backward compatibility.
"""
total: int = 128_000
archive_ratio: float = 0.7
session_state_ratio: float = 0.2
identity_tier: BudgetTier = field(default_factory=lambda: BudgetTier(
name="identity", priority=0, min_tokens=500, max_tokens=5000,
degradable=False, expandable=False,
))
session_state_tier: BudgetTier = field(default_factory=lambda: BudgetTier(
name="session_state", priority=1, min_tokens=1000, max_tokens=10000,
degradable=False, expandable=False,
))
archive_tier: BudgetTier = field(default_factory=lambda: BudgetTier(
name="archive", priority=5, min_tokens=500, max_tokens=40000,
degradable=True, expandable=False,
))
working_set_tier: BudgetTier = field(default_factory=lambda: BudgetTier(
name="working_set", priority=8, min_tokens=0, max_tokens=20000,
degradable=True, expandable=True,
))
@property
def archive_limit(self) -> int:
"""Maximum tokens allowed for archives (legacy compat)."""
return self.allocate().get("archive", int(self.total * self.archive_ratio))
@property
def session_state_limit(self) -> int:
"""Maximum tokens for session state (legacy compat)."""
return self.allocate().get("session_state", int(self.total * self.session_state_ratio))
def allocate(self) -> dict[str, int]:
"""Allocate budget across tiers by priority.
Returns dict of tier_name -> allocated_tokens.
Higher priority tiers get first claim on budget.
"""
tiers = sorted(
[self.identity_tier, self.session_state_tier,
self.archive_tier, self.working_set_tier],
key=lambda t: t.priority,
)
remaining = self.total
result = {}
for tier in tiers:
if not tier.degradable:
allocated = min(tier.min_tokens, remaining)
result[tier.name] = allocated
remaining -= allocated
for tier in tiers:
current = result.get(tier.name, 0)
wanted = tier.max_tokens - current
if wanted > 0 and remaining > 0:
extra = min(wanted, remaining)
result[tier.name] = current + extra
remaining -= extra
if remaining > 0:
for tier in tiers:
if tier.expandable and remaining > 0:
current = result.get(tier.name, 0)
result[tier.name] = current + remaining
remaining = 0
break
return result
@dataclass
class ArchiveRef:
"""Reference to a session archive.
Carries metadata about the archive without loading full content.
Used for assembly decisions and archive index display.
"""
archive_id: str
archive_uri: str
abstract: str
overview: str | None = None
tokens: int = 0
@dataclass
class SessionArchiveMeta:
"""Metadata for a session archive.
Stored as archive summary, used for archive selection.
"""
session_id: str
archive_id: str
created_at: str
message_count: int
overview: str
abstract: str
@dataclass
class ComposedContext:
"""Result of memory assembly operation.
Defines the cognitive contract: what context is available and where it goes.
Semantic slots map to specific prompt positions in the runtime.
"""
identity_context: str = ""
episodic_context: str = ""
session_context: str = ""
task_context: str = ""
retrieved_evidence: str = ""
open_loops: list[str] = field(default_factory=list)
uncertainties: list[str] = field(default_factory=list)
estimated_tokens: int = 0
budget_used_by_slot: dict[str, int] = field(default_factory=dict)
messages: list[dict] = field(default_factory=list)
archive_count: int = 0
archive_included: bool = False
system_prompt_suffix: str = ""
stats: dict = field(default_factory=dict)