"""Archive builder for constructing ContextNode from candidates.
Takes CandidateMemory and WritePlan to produce ContextNode ready for writing.
"""
from datetime import UTC, datetime
from core.enums import ContextType
from core.interfaces import LLM
from core.logging_config import get_logger
from core.models import (
CandidateMemory,
ContextNode,
RequestContext,
WritePlan,
)
logger = get_logger(__name__)
class ArchiveBuilder:
"""Builds ContextNode from candidate and write plan.
The builder handles:
- Constructing proper URI from WritePlan
- Merging content for merge actions (with LLM semantic merge)
- Setting appropriate metadata
- Building relations
"""
def __init__(self, llm: LLM):
"""Initialize ArchiveBuilder.
Args:
llm: LLM instance for semantic merge of skill content.
Skill content will be semantically merged (deduplicated, refined).
"""
self._llm = llm
def build(
self,
candidate: CandidateMemory,
plan: WritePlan,
ctx: RequestContext
) -> ContextNode:
"""Build ContextNode from candidate and write plan.
Args:
candidate: CandidateMemory with extracted content
plan: WritePlan with action and target URI
ctx: RequestContext for this operation
Returns:
ContextNode ready for ContextFS.write_node()
"""
if candidate.category in ("skill", "pattern", "tool"):
context_type = ContextType.SKILL.value
else:
context_type = ContextType.MEMORY.value
if candidate.category == "profile":
level = 3
else:
level = 4
if candidate.owner_scope == "user":
owner_space = ctx.user_space_name()
else:
owner_space = ctx.agent_space_name()
if plan.action == "merge" and plan.merged_fields:
content = self._merge_content(candidate, plan.merged_fields)
else:
content = candidate.content
if plan.action == "merge" and plan.merged_fields:
overview = self._merge_overview(candidate, plan.merged_fields)
else:
overview = candidate.overview
if plan.action == "merge" and plan.merged_fields.get("skill_merge"):
abstract = plan.merged_fields.get("new_abstract", candidate.abstract)
else:
abstract = candidate.abstract
metadata = {
"created_at": datetime.now(UTC).isoformat(),
}
if "expected_version" in plan.merged_fields:
metadata["expected_version"] = plan.merged_fields["expected_version"]
if "usage_count" in plan.merged_fields:
metadata["usage_count"] = plan.merged_fields["usage_count"]
if "usage_count_increment" in plan.merged_fields:
metadata["usage_count_increment"] = plan.merged_fields["usage_count_increment"]
if candidate.when:
metadata["when"] = candidate.when
if candidate.who:
metadata["who"] = candidate.who
if candidate.where:
metadata["where"] = candidate.where
if candidate.routing_key:
metadata["routing_key"] = candidate.routing_key
if candidate.tool_stats:
metadata["tool_stats"] = candidate.tool_stats
prov = plan.merged_fields.get("provenance_ids") or candidate.provenance_ids
if prov:
metadata["provenance_ids"] = prov
if plan.relation_edges:
metadata["_relations"] = plan.relation_edges
return ContextNode(
uri=plan.target_uri,
context_type=context_type,
category=candidate.category,
level=level,
owner_space=owner_space,
abstract=abstract,
overview=overview,
content=content,
metadata=metadata,
)
def _merge_overview(self, candidate: CandidateMemory, merged_fields: dict) -> str:
"""Merge overview for skill_merge or overview_append operations.
Args:
candidate: CandidateMemory with new overview
merged_fields: Fields to merge from existing node
Returns:
Merged overview string
Supports two merge modes:
- skill_merge: uses existing_overview + new_overview (semantic)
- overview_append: uses existing_overview + overview_append (direct)
"""
existing = merged_fields.get("existing_overview", "")
if merged_fields.get("skill_merge"):
new = merged_fields.get("new_overview", candidate.overview)
if not existing:
return new
return f"{existing}\n\n{new}"
if "overview_append" in merged_fields:
new = merged_fields.get("overview_append", candidate.overview)
if not existing:
return new
return f"{existing}\n\n{new}"
return candidate.overview
def _merge_content(self, candidate: CandidateMemory, merged_fields: dict) -> str:
"""Merge candidate content with existing data.
Args:
candidate: CandidateMemory with new content
merged_fields: Fields to merge from existing node
Returns:
Merged content string
Supports three merge modes:
- skill_merge: LLM semantic merge of existing_content + new_content
- content_append: Direct append of existing_content + content_append
- replace: Uses candidate.content directly (for profile, etc.)
"""
if merged_fields.get("skill_merge"):
existing = merged_fields.get("existing_content", "")
new_content = merged_fields.get("new_content", candidate.content)
if not existing:
return new_content
return self._llm_merge_content(
existing=existing,
new=new_content,
)
if "content_append" in merged_fields:
existing = merged_fields.get("existing_content", "")
new = merged_fields.get("content_append", candidate.content)
if not existing:
return new
return self._llm_merge_content(existing=existing, new=new)
if "abstract" in merged_fields:
return candidate.content
return candidate.content
def _llm_merge_content(self, existing: str, new: str) -> str:
"""Use LLM to semantically merge content.
Used for both skill accumulation and AggregateTopicPolicy merge
(preference, entity, pattern) to avoid infinite append buildup.
Args:
existing: Existing content
new: New content to merge in
Returns:
Merged content with duplicates removed.
"""
result = self._llm.complete_json(
prompt=(
"Merge these two memory descriptions into one. "
"Remove duplicates, keep all unique details. "
"If they conflict, prefer the newer content. "
"Return the result as structured Markdown.\n\n"
f"Existing:\n{existing}\n\nNew:\n{new}"
),
schema={
"type": "object",
"properties": {"content": {"type": "string"}},
"required": ["content"],
},
)
return result.get("content") or f"{existing}\n\n---\n\n{new}"