"""Context writer for persisting ContextNodes.
Orchestrates the write process: extraction → planning → building → writing → outbox.
"""
import hashlib
import time
from datetime import datetime, timezone
from typing import Optional
from commit.archive_builder import ArchiveBuilder
from commit.outbox_store import OutboxStore
from commit.policy_router import PolicyRouter
from core.errors import ConcurrentModificationError
from core.interfaces import LLM, ContextFS
from core.logging_config import get_logger
from core.models import CandidateMemory, ContextNode, RequestContext, WriteAction, WritePlan
logger = get_logger(__name__)
class ContextWriter:
"""Writes context memories to storage.
Orchestrates the full write pipeline:
1. Plan: Determine write action via MergePolicy
2. Build: Construct ContextNode via ArchiveBuilder
3. Write: Persist via ContextFS
4. Outbox: Register event for async index synchronization
"""
def __init__(
self,
fs: ContextFS,
llm: LLM,
policy_router: PolicyRouter | None = None,
archive_builder: ArchiveBuilder | None = None,
outbox_store: OutboxStore | None = None,
):
"""Initialize ContextWriter.
Args:
fs: ContextFS for persisting nodes
llm: LLM instance for semantic merge (required, passed to ArchiveBuilder)
policy_router: PolicyRouter for selecting merge policies
archive_builder: ArchiveBuilder for building ContextNode
outbox_store: OutboxStore for registering index events (optional)
"""
self._fs = fs
self._llm = llm
if policy_router is None:
raise ValueError(
"ContextWriter requires a PolicyRouter — "
"pass schema_registry to MemoryWriteAPI"
)
self._policy_router = policy_router
self._archive_builder = archive_builder or ArchiveBuilder(llm=llm)
self._outbox_store = outbox_store
self._executed_ops: set[str] = set()
def _operation_fingerprint(self, plan: WritePlan, ctx: RequestContext, content_hash: str = "") -> str:
"""Generate operation fingerprint for idempotency tracking.
Args:
plan: WritePlan for this operation
ctx: RequestContext for this operation
content_hash: Optional hash of the content to distinguish
same-URI different-content updates
Returns:
SHA256 hash fingerprint unique to this operation
"""
payload = f"{plan.target_uri}:{plan.action}:{ctx.session_id}:{content_hash}"
return hashlib.sha256(payload.encode()).hexdigest()
def write_candidate(
self,
candidate: CandidateMemory,
ctx: RequestContext
) -> WritePlan:
"""Write a single candidate memory.
Args:
candidate: CandidateMemory to write
ctx: RequestContext for this operation
Returns:
WritePlan showing what action was taken
"""
if candidate.owner_scope == "agent" and not ctx.agent_id:
logger.info(
"Skipping agent-scoped candidate without agent context: %s/%s",
candidate.category,
candidate.routing_key,
)
return WritePlan(
action="skip",
target_uri="",
merged_fields={},
relation_edges=[],
)
max_retries = 3
retry_delays = [0.1, 0.2, 0.4]
if self._policy_router is None:
raise RuntimeError(
"ContextWriter requires a PolicyRouter — "
"pass schema_registry to MemoryWriteAPI"
)
for attempt in range(max_retries):
try:
plan = self._policy_router.plan(candidate, ctx)
if plan.action == "skip":
return plan
content_hash = hashlib.sha256(
(candidate.content or "").encode()
).hexdigest()[:16]
fp = self._operation_fingerprint(plan, ctx, content_hash)
if fp in self._executed_ops:
logger.info(f"Skipping duplicate operation: {plan.target_uri}")
return plan
if plan.action == WriteAction.ARCHIVE:
self._executed_ops.add(fp)
return self._handle_archive(plan, ctx)
if plan.action == WriteAction.DELETE:
self._executed_ops.add(fp)
return self._handle_delete(plan, ctx)
node = self._archive_builder.build(candidate, plan, ctx)
use_atomic = (
hasattr(self._fs, 'write_node_with_outbox')
and self._outbox_store
and hasattr(self._outbox_store, 'build_write_event')
and plan.action in ("create", "merge")
)
if use_atomic:
outbox_event = self._outbox_store.build_write_event(node)
self._fs.write_node_with_outbox(node, ctx, outbox_event)
else:
self._fs.write_node(node, ctx)
self._executed_ops.add(fp)
if not use_atomic and self._outbox_store and plan.action in ("create", "merge"):
try:
self._outbox_store.register_write(node, ctx)
except Exception as e:
logger.warning(
"Failed to register outbox event for %s: %s",
node.uri, e
)
if self._outbox_store and plan.action in ("create", "merge"):
if candidate.category != "profile":
try:
self._outbox_store.register_directory(node, ctx)
except Exception as e:
logger.warning(
"Failed to register directory event for %s: %s",
node.uri, e
)
return plan
except ConcurrentModificationError as e:
if attempt < max_retries - 1:
delay = retry_delays[attempt]
logger.info(
"Concurrent modification detected for %s (attempt %d), "
"retrying after %.1fs: expected v%d, found v%d",
candidate.routing_key, attempt + 1, delay, e.expected_version, e.actual_version
)
time.sleep(delay)
continue
else:
logger.error(
"Max retries exceeded for concurrent modification at %s",
candidate.routing_key
)
raise
raise RuntimeError("Unreachable: write_candidate loop exited unexpectedly")
def write_candidates(
self,
candidates: list[CandidateMemory],
ctx: RequestContext
) -> list[WritePlan]:
"""Write multiple candidate memories.
Args:
candidates: List of CandidateMemory to write
ctx: RequestContext for these operations
Returns:
List of WritePlan showing what action was taken for each
"""
plans = []
for candidate in candidates:
try:
plan = self.write_candidate(candidate, ctx)
plans.append(plan)
except Exception as e:
logger.error(
"Failed to write candidate %s/%s: %s",
candidate.category, candidate.routing_key, e,
exc_info=True
)
return plans
def write_candidates_parallel(
self,
candidates: list[CandidateMemory],
ctx: RequestContext,
max_workers: int = 4
) -> list[WritePlan]:
"""Write multiple candidates in parallel.
Args:
candidates: List of CandidateMemory to write
ctx: RequestContext for these operations
max_workers: Maximum parallel write operations
Returns:
List of WritePlan showing what action was taken for each
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
plans = []
executor = ThreadPoolExecutor(max_workers=max_workers)
try:
futures = {
executor.submit(self.write_candidate, candidate, ctx): candidate
for candidate in candidates
}
for future in as_completed(futures):
candidate = futures[future]
try:
plan = future.result()
plans.append(plan)
except Exception as e:
logger.error(
"Failed to write candidate %s/%s: %s",
candidate.category, candidate.routing_key, e,
exc_info=True
)
finally:
executor.shutdown(wait=True, cancel_futures=True)
return plans
def _handle_archive(self, plan: WritePlan, ctx: RequestContext) -> WritePlan:
"""Handle archive operation: soft delete with status update.
Args:
plan: WritePlan with action="archive"
ctx: RequestContext for this operation
Returns:
WritePlan showing what action was taken
"""
try:
node = self._fs.read_node(plan.target_uri, ctx)
node.metadata["status"] = "ARCHIVED"
node.metadata["archived_at"] = datetime.now(timezone.utc).isoformat()
self._fs.write_node(node, ctx)
if self._outbox_store:
try:
self._outbox_store.register_archive(plan.target_uri, ctx)
except Exception as e:
logger.warning(
"Failed to register archive event for %s: %s",
plan.target_uri, e
)
logger.info("Archived node: %s", plan.target_uri)
return plan
except Exception as e:
logger.error("Failed to archive node %s: %s", plan.target_uri, e)
raise
def _handle_delete(self, plan: WritePlan, ctx: RequestContext) -> WritePlan:
"""Handle delete operation: permanent removal.
Args:
plan: WritePlan with action="delete"
ctx: RequestContext for this operation
Returns:
WritePlan showing what action was taken
"""
try:
self._fs.delete_node(plan.target_uri, ctx)
if self._outbox_store:
try:
self._outbox_store.register_delete(plan.target_uri, ctx)
except Exception as e:
logger.warning(
"Failed to register delete event for %s: %s",
plan.target_uri, e
)
logger.info("Deleted node: %s", plan.target_uri)
return plan
except Exception as e:
logger.error("Failed to delete node %s: %s", plan.target_uri, e)
raise