"""Merge policies for determining write actions.
Each policy implements the MergePolicy Protocol and decides whether
to create, merge, append, or skip a candidate memory.
"""
import uuid as uuid_lib
from datetime import UTC, datetime
from commit.routing_key import normalize_routing_key
from core.interfaces import ContextFS, MergePolicy
from core.models import CandidateMemory, RequestContext, WritePlan
from core.uri_resolver import URIResolver
def _merge_provenance_ids(
existing: list[str] | None,
incoming: list[str] | None,
) -> list[str]:
"""Merge two provenance ID lists, preserving order and deduplicating."""
merged = (existing or []) + (incoming or [])
return list(dict.fromkeys(merged))
def _schema_routing_fields(category: str, routing_key: str) -> dict[str, str]:
"""Build URI template fields from the candidate routing key."""
return {"routing_key": routing_key}
class ProfilePolicy(MergePolicy):
"""Merge policy for user profile nodes.
Behavior (CLAUDE.md §5):
- Target URI: ctx://{account}/users/{user}/memories/profile
- Always merge (never create new, never skip)
- New information replaces old (user state changes)
- Conflict resolution: new information wins
Profile is a fixed URI that gets continuously updated.
"""
def __init__(self, fs: ContextFS, uri_resolver: URIResolver):
"""Initialize ProfilePolicy.
Args:
fs: ContextFS for checking existing nodes
uri_resolver: URIResolver for URI construction
"""
self._fs = fs
self._uri_resolver = uri_resolver
def plan(self, candidate: CandidateMemory, ctx: RequestContext) -> WritePlan:
"""Generate WritePlan for profile candidate.
Args:
candidate: CandidateMemory (must be category="profile")
ctx: RequestContext for this operation
Returns:
WritePlan with action="merge" and target URI for profile
"""
target_uri = self._uri_resolver.resolve("profile", {"routing_key": candidate.routing_key}, ctx)
exists = self._fs.exists(target_uri, ctx)
if exists:
action = "merge"
existing_node = self._fs.read_node(target_uri, ctx)
current_version = existing_node.metadata.get("version", 0)
merged_fields = {
"abstract": candidate.abstract,
"overview": candidate.overview,
"content": candidate.content,
"updated_at": datetime.now(UTC).isoformat(),
"expected_version": current_version,
"provenance_ids": _merge_provenance_ids(
existing_node.metadata.get("provenance_ids"),
candidate.provenance_ids,
),
}
else:
action = "create"
merged_fields = {}
return WritePlan(
action=action,
target_uri=target_uri,
merged_fields=merged_fields,
relation_edges=[],
)
class AggregateTopicPolicy(MergePolicy):
"""Merge policy for topic-based memories.
Applies to: preference, entity, pattern
Behavior (CLAUDE.md §5):
- Target URI: ctx://.../{category}/{slug}
- If exists: merge content into overview/content
- If not exists: create new node
- Similar slug handling: edit distance < 2 or semantic > 0.9 → merge
Uses routing_key as slug for URI construction.
"""
def __init__(self, fs: ContextFS, uri_resolver: URIResolver, similarity_threshold: float = 0.9):
"""Initialize AggregateTopicPolicy.
Args:
fs: ContextFS for checking existing nodes
uri_resolver: URIResolver for URI construction
similarity_threshold: Semantic similarity threshold for slug merging
"""
self._fs = fs
self._uri_resolver = uri_resolver
self._similarity_threshold = similarity_threshold
def plan(self, candidate: CandidateMemory, ctx: RequestContext) -> WritePlan:
"""Generate WritePlan for topic-based candidate.
Args:
candidate: CandidateMemory (preference, entity, or pattern)
ctx: RequestContext for this operation
Returns:
WritePlan with action="create" or "merge"
"""
normalized_key = normalize_routing_key(candidate.routing_key, candidate.category)
routing_fields = _schema_routing_fields(candidate.category, normalized_key)
target_uri = self._uri_resolver.resolve(candidate.category, routing_fields, ctx)
if self._fs.exists(target_uri, ctx):
existing_node = self._fs.read_node(target_uri, ctx)
current_version = existing_node.metadata.get("version", 0)
return WritePlan(
action="merge",
target_uri=target_uri,
merged_fields={
"abstract": candidate.abstract,
"existing_overview": existing_node.overview,
"overview_append": candidate.overview,
"existing_content": existing_node.content,
"content_append": candidate.content,
"expected_version": current_version,
"provenance_ids": _merge_provenance_ids(
existing_node.metadata.get("provenance_ids"),
candidate.provenance_ids,
),
},
relation_edges=[],
)
return WritePlan(
action="create",
target_uri=target_uri,
merged_fields={},
relation_edges=[],
)
class AppendOnlyPolicy(MergePolicy):
"""Merge policy for time-series events.
Applies to: event, case
Behavior (CLAUDE.md §5):
- event_id/case_id globally unique (timestamp_uuid)
- Always create, never overwrite history
- Establish SEQUENCE relations for same-session events
Each event is a new node - historical record preserved.
"""
def __init__(self, fs: ContextFS, uri_resolver: URIResolver):
"""Initialize AppendOnlyPolicy.
Args:
fs: ContextFS for checking existing nodes
uri_resolver: URIResolver for URI construction
"""
self._fs = fs
self._uri_resolver = uri_resolver
def plan(self, candidate: CandidateMemory, ctx: RequestContext) -> WritePlan:
"""Generate WritePlan for event/case candidate.
Args:
candidate: CandidateMemory (event or case)
ctx: RequestContext for this operation
Returns:
WritePlan with action="create" and unique event ID
"""
if candidate.category == "event":
timestamp = datetime.now(UTC).strftime("%Y%m%d%H%M%S")
unique_id = uuid_lib.uuid4().hex[:8]
fields = {"routing_key": f"{candidate.routing_key}_{timestamp}_{unique_id}"}
elif candidate.category == "case":
timestamp = datetime.now(UTC).strftime("%Y%m%d%H%M%S")
unique_id = uuid_lib.uuid4().hex[:8]
fields = {"routing_key": f"{candidate.routing_key}_{timestamp}_{unique_id}"}
else:
timestamp = datetime.now(UTC).strftime("%Y%m%d%H%M%S")
unique_id = uuid_lib.uuid4().hex[:8]
fields = {"routing_key": f"{candidate.routing_key}_{timestamp}_{unique_id}"}
target_uri = self._uri_resolver.resolve(candidate.category, fields, ctx)
return WritePlan(
action="create",
target_uri=target_uri,
merged_fields={},
relation_edges=[],
)
class SkillToolPolicy(MergePolicy):
"""Merge policy for agent skills and tools.
Applies to: skill, tool
Behavior (CLAUDE.md §5):
- skill URI: ctx://{account}/agents/{agent}/skills/{routing_key}
- tool URI: ctx://{account}/agents/{agent}/memories/tools/{routing_key}
- Fixed URI, always exists or is created
- Cumulative: best practices, failures, parameters appended
- Usage stats tracked in metadata
Skills and tools accumulate knowledge over time.
"""
def __init__(self, fs: ContextFS, uri_resolver: URIResolver):
"""Initialize SkillToolPolicy.
Args:
fs: ContextFS for checking existing nodes
uri_resolver: URIResolver for URI construction
"""
self._fs = fs
self._uri_resolver = uri_resolver
def plan(self, candidate: CandidateMemory, ctx: RequestContext) -> WritePlan:
"""Generate WritePlan for skill/tool candidate.
Args:
candidate: CandidateMemory (skill or tool)
ctx: RequestContext for this operation
Returns:
WritePlan with action="create" or "merge" (accumulate)
"""
fields = _schema_routing_fields(candidate.category, candidate.routing_key)
target_uri = self._uri_resolver.resolve(candidate.category, fields, ctx)
exists = self._fs.exists(target_uri, ctx)
if exists:
existing_node = self._fs.read_node(target_uri, ctx)
current_version = existing_node.metadata.get("version", 0)
return WritePlan(
action="merge",
target_uri=target_uri,
merged_fields={
"skill_merge": True,
"existing_content": existing_node.content,
"existing_overview": existing_node.overview,
"existing_abstract": existing_node.abstract,
"new_content": candidate.content,
"new_overview": candidate.overview,
"new_abstract": candidate.abstract,
"usage_count": existing_node.metadata.get("usage_count", 0) + 1,
"expected_version": current_version,
"provenance_ids": _merge_provenance_ids(
existing_node.metadata.get("provenance_ids"),
candidate.provenance_ids,
),
},
relation_edges=[],
)
else:
return WritePlan(
action="create",
target_uri=target_uri,
merged_fields={
"usage_count": 1,
},
relation_edges=[],
)