from __future__ import annotations
import json
from pathlib import Path
from unittest.mock import Mock, patch
import pytest
from core.models import RequestContext
from providers import unified_config
from providers.unified_config import OgMemConfig
from server.memory_service import MemoryService
from session.topic_buffer import SlotContent
@pytest.fixture
def ctx() -> RequestContext:
return RequestContext(
account_id="acct-test",
user_id="user-test",
agent_id="agent-test",
session_id="sess-compact",
trace_id="trace-1",
)
@pytest.fixture
def service() -> MemoryService:
return MemoryService(config=OgMemConfig())
def _seed_buffer(service: MemoryService, ctx: RequestContext, session_id: str = "sess-compact"):
mgr = service.get_session_manager()
mgr.add_message(session_id, "user", "first user fact", ctx)
mgr.add_message(session_id, "assistant", "assistant filler", ctx)
mgr.add_message(session_id, "user", "second user fact", ctx)
return mgr.get_or_create(session_id)
def test_prepare_compaction_uses_incremental_non_assistant_messages(service: MemoryService, ctx: RequestContext):
buf = _seed_buffer(service, ctx)
buf.extraction_watermark = 1
buf.extraction_summary = "existing-summary"
buf.tool_usage_stats = {
"search_web": {
"call_count": 2,
"success_count": 2,
"fail_count": 0,
"total_duration_ms": 14.0,
"total_prompt_tokens": 0,
"total_completion_tokens": 0,
}
}
buf.messages[2].created_at = "2026-04-19T01:02:03+00:00"
write_api = Mock()
write_api.commit_session.return_value = {
"candidates_extracted": 1,
"plans": [{"action": "create", "target_uri": "ctx://acct/users/u/memories/case/task-1"}],
}
service._write_api = write_api
result = service.prepare_compaction({"sessionId": "sess-compact", "_ctx": ctx})
assert result["buffer"] is buf
write_api.commit_session.assert_called_once()
call_messages = write_api.commit_session.call_args.kwargs["messages"]
assert len(call_messages) == 1
assert call_messages[0]["role"] == "user"
assert call_messages[0]["content"] == "second user fact"
assert "id" in call_messages[0]
assert write_api.commit_session.call_args.kwargs["session_summary"] == "existing-summary"
assert write_api.commit_session.call_args.kwargs["session_time"].isoformat() == "2026-04-19T01:02:03+00:00"
assert "search_web" in write_api.commit_session.call_args.kwargs["tool_stats_text"]
assert buf.extraction_watermark == 3
assert "task-1" in buf.extraction_summary
def test_prepare_compaction_invalidates_cached_identity_when_profile_is_written(
service: MemoryService,
ctx: RequestContext,
):
buf = _seed_buffer(service, ctx)
topic = service.get_session_manager().get_topic_buffer("sess-compact")
topic.set_cached_slot(
"identity",
SlotContent(
content="stale identity",
uris=["ctx://acct-test/users/user-test/memories/profile"],
tokens=4,
),
)
write_api = Mock()
write_api.commit_session.return_value = {
"candidates_extracted": 1,
"candidates_filtered": 0,
"writes_completed": 1,
"writes_skipped": 0,
"writes_failed": 0,
"plans": [
{
"action": "merge",
"target_uri": "ctx://acct-test/users/user-test/memories/profile",
}
],
}
service._write_api = write_api
service.prepare_compaction({"sessionId": "sess-compact", "_ctx": ctx})
assert buf.extraction_watermark == 3
assert topic.get_cached_slot("identity") is None
def test_prepare_compaction_raises_when_write_api_unavailable(service: MemoryService, ctx: RequestContext):
_seed_buffer(service, ctx)
service.get_write_api = Mock(return_value=None)
with pytest.raises(RuntimeError, match="write API unavailable"):
service.prepare_compaction({"sessionId": "sess-compact", "_ctx": ctx})
def test_prepare_compaction_advances_watermark_when_only_assistant_messages(service: MemoryService, ctx: RequestContext):
mgr = service.get_session_manager()
mgr.add_message("sess-compact", "assistant", "only assistant", ctx)
buf = mgr.get_or_create("sess-compact")
write_api = Mock()
service._write_api = write_api
result = service.prepare_compaction({"sessionId": "sess-compact", "_ctx": ctx})
assert result["messages"] == []
write_api.commit_session.assert_not_called()
assert buf.extraction_watermark == 1
assert buf.extraction_summary == ""
def test_prepare_compaction_does_not_advance_watermark_or_issue_token_when_write_fails(service: MemoryService, ctx: RequestContext):
buf = _seed_buffer(service, ctx)
buf.extraction_watermark = 1
buf.extraction_summary = "existing-summary"
write_api = Mock()
write_api.commit_session.return_value = {
"candidates_extracted": 1,
"candidates_filtered": 0,
"writes_completed": 0,
"writes_skipped": 0,
"writes_failed": 1,
"plans": [],
}
service._write_api = write_api
with pytest.raises(RuntimeError, match="compaction extraction write failed"):
service.prepare_compaction({"sessionId": "sess-compact", "_ctx": ctx})
assert buf.extraction_watermark == 1
assert buf.extraction_summary == "existing-summary"
assert buf.compaction_prepare_token == ""
assert buf.compaction_prepare_token_created_at == ""
def test_prepare_compaction_reuses_existing_prepare_token_and_archive_id(
service: MemoryService,
ctx: RequestContext,
):
_seed_buffer(service, ctx)
write_api = Mock()
write_api.commit_session.return_value = {
"candidates_extracted": 0,
"candidates_filtered": 0,
"writes_completed": 0,
"writes_skipped": 0,
"writes_failed": 0,
"plans": [],
}
service._write_api = write_api
first = service.prepare_compaction({"sessionId": "sess-compact", "_ctx": ctx})
second = service.prepare_compaction({"sessionId": "sess-compact", "_ctx": ctx})
assert second["prepareToken"] == first["prepareToken"]
assert second["archive_id"] == first["archive_id"]
write_api.commit_session.assert_called_once()
def test_after_turn_uses_configured_threshold_before_env(
ctx: RequestContext,
monkeypatch,
):
monkeypatch.setenv("OGMEM_AFTER_TURN_THRESHOLD", "1")
service = MemoryService(config=OgMemConfig(after_turn_threshold=999999))
service.get_write_api = Mock(
side_effect=AssertionError("after_turn should still be accumulating")
)
result = service.after_turn({
"sessionId": "sess-compact",
"_ctx": ctx,
"messages": [
{"role": "user", "content": "x" * 80},
{"role": "assistant", "content": "ack"},
],
})
assert result["status"] == "accumulating"
assert result["pending_tokens"] < 999999
def test_compact_returns_required_not_compacted_result_shape_when_commit_has_no_archive(service: MemoryService, ctx: RequestContext):
mgr = service.get_session_manager()
mgr.add_message("sess-compact", "user", "pending user content", ctx)
service.prepare_compaction = Mock(return_value={"ctx": ctx, "mgr": mgr, "session_id": "sess-compact"})
mgr.commit = Mock(return_value={"ok": True, "archived": False, "reason": "empty_buffer"})
result = service.compact({"sessionId": "sess-compact", "_ctx": ctx})
assert result == {
"ok": True,
"compacted": False,
"reason": "empty_buffer",
"result": {
"summary": "",
"tokensBefore": 5,
},
}
def test_compact_surfaces_failed_archive_commit_and_keeps_buffer(service: MemoryService, ctx: RequestContext):
mgr = service.get_session_manager()
mgr.add_message("sess-compact", "user", "pending user content", ctx)
service.prepare_compaction = Mock(return_value={"ctx": ctx, "mgr": mgr, "session_id": "sess-compact"})
mgr._write_archive = Mock(return_value={
"success": False,
"uri": "",
"archive_id": "arc-failed",
"error": "archive store unavailable",
})
result = service.compact({"sessionId": "sess-compact", "_ctx": ctx})
assert result["ok"] is False
assert result["compacted"] is False
assert result["status"] == "failed"
assert "archive store unavailable" in result["error"]
session = mgr.get_session("sess-compact", ctx)
assert session["message_count"] == 1
def test_after_turn_no_extractable_archive_failure_rewinds_watermark(service: MemoryService, ctx: RequestContext, monkeypatch):
service._cfg.after_turn_threshold = 1
service._write_api = Mock()
service.drain_outbox_sync = Mock(return_value={"processed": 0, "succeeded": 0, "failed": 0})
mgr = service.get_session_manager()
mgr._write_archive = Mock(return_value={
"success": False,
"uri": "",
"archive_id": "arc-failed",
"error": "archive store unavailable",
})
mgr._compress = Mock(return_value=("overview", "abstract"))
result = service.after_turn({
"sessionId": "sess-compact",
"messages": [{"role": "assistant", "content": "assistant only content"}],
"_ctx": ctx,
})
buf = mgr.get_or_create("sess-compact")
assert result["ok"] is False
assert result["status"] == "failed"
assert result["error"] == "archive store unavailable"
assert buf.extraction_watermark == 0
assert len(buf.messages) == 1
def test_after_turn_background_archive_failure_rewinds_watermark(service: MemoryService, ctx: RequestContext, monkeypatch):
service._cfg.after_turn_threshold = 1
service._write_api = Mock()
service._chunk_and_extract = Mock(return_value={
"candidates_extracted": 1,
"candidates_filtered": 0,
"writes_completed": 1,
"writes_skipped": 0,
"writes_failed": 0,
"plans": [{"action": "create", "target_uri": "ctx://acct/users/u/memories/case/task-1"}],
})
service._async_drain = Mock()
mgr = service.get_session_manager()
mgr._write_archive = Mock(return_value={
"success": False,
"uri": "",
"archive_id": "arc-failed",
"error": "archive store unavailable",
})
mgr._compress = Mock(return_value=("overview", "abstract"))
class InlineThread:
def __init__(self, target, daemon=None, name=None):
self.target = target
def start(self):
self.target()
with patch("server.memory_service.threading.Thread", InlineThread):
result = service.after_turn({
"sessionId": "sess-compact",
"messages": [{"role": "user", "content": "remember this failure path"}],
"_ctx": ctx,
})
buf = mgr.get_or_create("sess-compact")
assert result["ok"] is True
assert result["status"] == "processing"
assert buf.extraction_watermark == 0
assert len(buf.messages) == 1
def test_after_turn_background_invalidates_cached_identity_when_profile_is_written(
service: MemoryService,
ctx: RequestContext,
monkeypatch,
):
service._cfg.after_turn_threshold = 1
service._write_api = Mock()
service._chunk_and_extract = Mock(return_value={
"candidates_extracted": 1,
"candidates_filtered": 0,
"writes_completed": 1,
"writes_skipped": 0,
"writes_failed": 0,
"plans": [
{
"action": "merge",
"target_uri": "ctx://acct-test/users/user-test/memories/profile",
}
],
})
service._async_drain = Mock()
mgr = service.get_session_manager()
mgr.commit_snapshot = Mock(return_value={"ok": True, "archived": False})
topic = mgr.get_topic_buffer("sess-compact")
topic.set_cached_slot(
"identity",
SlotContent(
content="stale identity",
uris=["ctx://acct-test/users/user-test/memories/profile"],
tokens=4,
),
)
class InlineThread:
def __init__(self, target, daemon=None, name=None):
self.target = target
def start(self):
self.target()
with patch("server.memory_service.threading.Thread", InlineThread):
result = service.after_turn({
"sessionId": "sess-compact",
"messages": [{"role": "user", "content": "my profile changed"}],
"_ctx": ctx,
})
assert result["ok"] is True
assert result["status"] == "processing"
assert topic.get_cached_slot("identity") is None
def test_after_turn_background_write_failure_rewinds_watermark_and_skips_archive_commit(service: MemoryService, ctx: RequestContext, monkeypatch):
service._cfg.after_turn_threshold = 1
service._write_api = Mock()
service._chunk_and_extract = Mock(return_value={
"candidates_extracted": 1,
"candidates_filtered": 0,
"writes_completed": 0,
"writes_skipped": 0,
"writes_failed": 1,
"plans": [],
})
service._async_drain = Mock()
mgr = service.get_session_manager()
mgr._write_archive = Mock(return_value={
"success": True,
"uri": "ctx://acct-test/sessions/sess-compact/history/arc-ok",
"archive_id": "arc-ok",
})
mgr._compress = Mock(return_value=("overview", "abstract"))
mgr.commit_snapshot = Mock(return_value={"ok": True, "archived": True})
class InlineThread:
def __init__(self, target, daemon=None, name=None):
self.target = target
def start(self):
self.target()
with patch("server.memory_service.threading.Thread", InlineThread):
result = service.after_turn({
"sessionId": "sess-compact",
"messages": [{"role": "user", "content": "trigger write failure"}],
"_ctx": ctx,
})
buf = mgr.get_or_create("sess-compact")
assert result["ok"] is True
assert result["status"] == "processing"
assert buf.extraction_watermark == 0
assert len(buf.messages) == 1
mgr.commit_snapshot.assert_not_called()
def test_after_turn_sync_fallback_archive_failure_rewinds_watermark_and_fails(service: MemoryService, ctx: RequestContext, monkeypatch):
service._cfg.after_turn_threshold = 1
service._write_api = Mock()
service._chunk_and_extract = Mock(return_value={
"candidates_extracted": 1,
"candidates_filtered": 0,
"writes_completed": 1,
"writes_skipped": 0,
"writes_failed": 0,
})
service._async_drain = Mock()
mgr = service.get_session_manager()
mgr._write_archive = Mock(return_value={
"success": False,
"uri": "",
"archive_id": "arc-failed",
"error": "archive store unavailable",
})
mgr._compress = Mock(return_value=("overview", "abstract"))
class FailingThread:
def __init__(self, *args, **kwargs):
raise RuntimeError("thread unavailable")
with patch("server.memory_service.threading.Thread", FailingThread):
result = service.after_turn({
"sessionId": "sess-compact",
"messages": [{"role": "user", "content": "remember sync fallback failure"}],
"_ctx": ctx,
})
buf = mgr.get_or_create("sess-compact")
assert result["ok"] is False
assert result["status"] == "failed"
assert result["error"] == "archive store unavailable"
assert buf.extraction_watermark == 0
assert len(buf.messages) == 1
def test_compact_rejects_prepared_flag_without_prepare_token(service: MemoryService, ctx: RequestContext):
mgr = service.get_session_manager()
mgr.add_message("sess-compact", "user", "pending user content", ctx)
service.prepare_compaction = Mock(side_effect=AssertionError("prepare_compaction should not run"))
with pytest.raises(ValueError, match="prepare token required"):
service.compact({"sessionId": "sess-compact", "_ctx": ctx, "prepared": True})
def test_compact_rejects_invalid_prepare_token(service: MemoryService, ctx: RequestContext):
mgr = service.get_session_manager()
mgr.add_message("sess-compact", "user", "pending user content", ctx)
service.prepare_compaction = Mock(side_effect=AssertionError("prepare_compaction should not run"))
with pytest.raises(ValueError, match="invalid prepare token"):
service.compact({
"sessionId": "sess-compact",
"_ctx": ctx,
"prepareToken": "bad-token",
})
def test_compact_invalid_prepare_token_does_not_invalidate_valid_token(service: MemoryService, ctx: RequestContext):
mgr = service.get_session_manager()
mgr.add_message("sess-compact", "user", "pending user content", ctx)
write_api = Mock()
write_api.commit_session.return_value = {"candidates_extracted": 0, "plans": []}
service._write_api = write_api
prepared = service.prepare_compaction({"sessionId": "sess-compact", "_ctx": ctx})
mgr.commit = Mock(return_value={"ok": True, "archived": True})
mgr.get_context = Mock(return_value={
"latest_archive_overview": "summary",
"estimatedTokens": 9,
})
with pytest.raises(ValueError, match="invalid prepare token"):
service.compact({
"sessionId": "sess-compact",
"_ctx": ctx,
"prepareToken": "bad-token",
})
result = service.compact({
"sessionId": "sess-compact",
"_ctx": ctx,
"prepareToken": prepared["prepareToken"],
})
assert result["compacted"] is True
assert result["result"]["summary"] == "summary"
def test_compact_accepts_one_time_verified_prepare_token(service: MemoryService, ctx: RequestContext):
mgr = service.get_session_manager()
mgr.add_message("sess-compact", "user", "pending user content", ctx)
write_api = Mock()
write_api.commit_session.return_value = {"candidates_extracted": 0, "plans": []}
service._write_api = write_api
prepared = service.prepare_compaction({"sessionId": "sess-compact", "_ctx": ctx})
mgr.commit = Mock(return_value={"ok": True, "archived": True})
mgr.get_context = Mock(return_value={
"latest_archive_overview": "summary",
"estimatedTokens": 9,
})
result = service.compact({
"sessionId": "sess-compact",
"_ctx": ctx,
"prepareToken": prepared["prepareToken"],
})
assert result["compacted"] is True
assert result["result"]["summary"] == "summary"
with pytest.raises(ValueError, match="invalid prepare token"):
service.compact({
"sessionId": "sess-compact",
"_ctx": ctx,
"prepareToken": prepared["prepareToken"],
})
def test_compact_reuses_prepare_archive_id_for_archive_commit(
service: MemoryService,
ctx: RequestContext,
):
mgr = service.get_session_manager()
mgr.add_message("sess-compact", "user", "pending user content", ctx)
write_api = Mock()
write_api.commit_session.return_value = {"candidates_extracted": 0, "plans": []}
service._write_api = write_api
prepared = service.prepare_compaction({"sessionId": "sess-compact", "_ctx": ctx})
archive_id = prepared["archive_id"]
mgr.commit = Mock(return_value={"ok": True, "archived": True, "archive_id": archive_id})
mgr.get_context = Mock(return_value={
"latest_archive_overview": "summary",
"estimatedTokens": 9,
})
result = service.compact({
"sessionId": "sess-compact",
"_ctx": ctx,
"prepareToken": prepared["prepareToken"],
})
assert result["compacted"] is True
assert write_api.commit_session.call_args.kwargs["archive_id"] == archive_id
mgr.commit.assert_called_once_with("sess-compact", ctx, wait=True, archive_id=archive_id)
def test_compact_invalid_prepare_token_ttl_falls_back_to_default(service: MemoryService, ctx: RequestContext):
mgr = service.get_session_manager()
mgr.add_message("sess-compact", "user", "pending user content", ctx)
write_api = Mock()
write_api.commit_session.return_value = {"candidates_extracted": 0, "plans": []}
service._write_api = write_api
prepared = service.prepare_compaction({"sessionId": "sess-compact", "_ctx": ctx})
service._cfg.compact_prepare_token_ttl = "bad"
mgr.commit = Mock(return_value={"ok": True, "archived": True})
mgr.get_context = Mock(return_value={
"latest_archive_overview": "summary",
"estimatedTokens": 9,
})
result = service.compact({
"sessionId": "sess-compact",
"_ctx": ctx,
"prepareToken": prepared["prepareToken"],
})
assert result["compacted"] is True
assert result["result"]["summary"] == "summary"
def test_compact_rejects_expired_prepare_token(service: MemoryService, ctx: RequestContext):
service._cfg.compact_prepare_token_ttl = 300
mgr = service.get_session_manager()
mgr.add_message("sess-compact", "user", "pending user content", ctx)
write_api = Mock()
write_api.commit_session.return_value = {"candidates_extracted": 0, "plans": []}
service._write_api = write_api
prepared = service.prepare_compaction({"sessionId": "sess-compact", "_ctx": ctx})
buf = mgr.get_or_create("sess-compact")
buf.compaction_prepare_token_created_at = "2000-01-01T00:00:00+00:00"
with pytest.raises(ValueError, match="invalid prepare token"):
service.compact({
"sessionId": "sess-compact",
"_ctx": ctx,
"prepareToken": prepared["prepareToken"],
})
assert buf.compaction_prepare_token == ""
assert buf.compaction_prepare_token_created_at == ""
def test_compact_trims_summary_and_does_not_fabricate_first_kept_entry_id(service: MemoryService, ctx: RequestContext):
service._cfg.summary_max_chars = 12
mgr = service.get_session_manager()
mgr.add_message("sess-compact", "user", "pending user content", ctx)
service.prepare_compaction = Mock(return_value={"ctx": ctx, "mgr": mgr, "session_id": "sess-compact"})
mgr.commit = Mock(return_value={"ok": True, "archived": True, "archive_id": "archive-123"})
mgr.get_context = Mock(return_value={
"latest_archive_overview": "1234567890ABCDEFGHIJ",
"estimatedTokens": 42,
})
result = service.compact({"sessionId": "sess-compact", "_ctx": ctx, "tokenBudget": 128_000})
assert result["ok"] is True
assert result["compacted"] is True
assert result["result"]["summary"] == "90ABCDEFGHIJ"
assert result["result"]["tokensAfter"] == 42
assert "firstKeptEntryId" not in result["result"]
def test_compact_request_summary_max_chars_overrides_config(service: MemoryService, ctx: RequestContext):
service._cfg.summary_max_chars = 12
mgr = service.get_session_manager()
mgr.add_message("sess-compact", "user", "pending user content", ctx)
service.prepare_compaction = Mock(return_value={"ctx": ctx, "mgr": mgr, "session_id": "sess-compact"})
mgr.commit = Mock(return_value={"ok": True, "archived": True})
mgr.get_context = Mock(return_value={
"latest_archive_overview": "1234567890ABCDEFGHIJ",
"estimatedTokens": 42,
})
result = service.compact({
"sessionId": "sess-compact",
"_ctx": ctx,
"summaryMaxChars": 8,
})
assert result["result"]["summary"] == "CDEFGHIJ"
def test_compact_invalid_request_summary_max_chars_falls_back_to_config(service: MemoryService, ctx: RequestContext):
service._cfg.summary_max_chars = 12
mgr = service.get_session_manager()
mgr.add_message("sess-compact", "user", "pending user content", ctx)
service.prepare_compaction = Mock(return_value={"ctx": ctx, "mgr": mgr, "session_id": "sess-compact"})
mgr.commit = Mock(return_value={"ok": True, "archived": True})
mgr.get_context = Mock(return_value={
"latest_archive_overview": "1234567890ABCDEFGHIJ",
"estimatedTokens": 42,
})
result = service.compact({
"sessionId": "sess-compact",
"_ctx": ctx,
"summaryMaxChars": 0,
})
assert result["result"]["summary"] == "90ABCDEFGHIJ"
def test_compact_sync_short_term_index_mode_drains_outbox_best_effort(service: MemoryService, ctx: RequestContext):
mgr = service.get_session_manager()
mgr.add_message("sess-compact", "user", "pending user content", ctx)
service.prepare_compaction = Mock(return_value={"ctx": ctx, "mgr": mgr, "session_id": "sess-compact"})
mgr.commit = Mock(return_value={"ok": True, "archived": True})
mgr.get_context = Mock(return_value={
"latest_archive_overview": "summary",
"estimatedTokens": 42,
})
service.drain_outbox_sync = Mock(side_effect=RuntimeError("index unavailable"))
result = service.compact({
"sessionId": "sess-compact",
"_ctx": ctx,
"shortTermIndexMode": "sync",
})
service.drain_outbox_sync.assert_called_once()
assert result["ok"] is True
assert result["compacted"] is True
assert result["result"]["summary"] == "summary"
@pytest.mark.parametrize("mode", ["async", "off"])
def test_compact_async_and_off_short_term_index_modes_skip_drain(service: MemoryService, ctx: RequestContext, mode: str):
mgr = service.get_session_manager()
mgr.add_message("sess-compact", "user", "pending user content", ctx)
service.prepare_compaction = Mock(return_value={"ctx": ctx, "mgr": mgr, "session_id": "sess-compact"})
mgr.commit = Mock(return_value={"ok": True, "archived": True})
mgr.get_context = Mock(return_value={
"latest_archive_overview": "summary",
"estimatedTokens": 42,
})
service.drain_outbox_sync = Mock()
result = service.compact({
"sessionId": "sess-compact",
"_ctx": ctx,
"shortTermIndexMode": mode,
})
service.drain_outbox_sync.assert_not_called()
assert result["ok"] is True
assert result["compacted"] is True
def test_dispose_no_pending_messages_removes_session_when_context_build_fails(service: MemoryService, ctx: RequestContext):
mgr = service.get_session_manager()
mgr.get_or_create("sess-compact", ctx=ctx)
service.build_context = Mock(side_effect=RuntimeError("bad ctx"))
result = service.dispose({"sessionId": "sess-compact"})
assert result == {"disposed": True, "flushed": False, "reason": "no_pending_messages"}
assert not mgr.has_session("sess-compact")
def test_ogmem_config_uses_default_summary_max_chars():
cfg = OgMemConfig()
assert cfg.summary_max_chars == 4000
def test_ogmem_config_load_reads_summary_max_chars_from_yaml_and_env(monkeypatch: pytest.MonkeyPatch):
monkeypatch.setenv("OGMEM_SUMMARY_MAX_CHARS", "222")
monkeypatch.setattr(
unified_config,
"_load_yaml",
lambda _path: {"memory": {"summary_max_chars": 123}},
)
cfg = OgMemConfig.load("unused.yaml")
assert cfg.summary_max_chars == 123
monkeypatch.setattr(unified_config, "_load_yaml", lambda _path: {"memory": {}})
cfg = OgMemConfig.load("unused.yaml")
assert cfg.summary_max_chars == 222
class TestIngestWritesToBuffer:
"""ingest() should write messages to the session buffer."""
def test_ingest_writes_user_message_to_buffer(self, service: MemoryService, ctx: RequestContext):
result = service.ingest({
"sessionId": "sess-compact",
"messages": [{"role": "user", "content": "hello from ingest"}],
"_ctx": ctx,
})
assert result["ingested"] is True
assert result["count"] == 1
mgr = service.get_session_manager()
buf = mgr.get_or_create("sess-compact")
assert len(buf.messages) == 1
assert buf.messages[0].role == "user"
assert buf.messages[0].content == "hello from ingest"
def test_ingest_empty_messages_returns_early(self, service: MemoryService, ctx: RequestContext):
result = service.ingest({"sessionId": "sess-compact", "messages": [], "_ctx": ctx})
assert result["ingested"] is True
def test_ingest_skips_empty_content(self, service: MemoryService, ctx: RequestContext):
result = service.ingest({
"sessionId": "sess-compact",
"messages": [{"role": "user", "content": ""}],
"_ctx": ctx,
})
assert result["ingested"] is True
assert result["count"] == 0
def test_ingest_multiple_messages(self, service: MemoryService, ctx: RequestContext):
result = service.ingest({
"sessionId": "sess-compact",
"messages": [
{"role": "user", "content": "msg1"},
{"role": "assistant", "content": "msg2"},
],
"_ctx": ctx,
})
assert result["count"] == 2
mgr = service.get_session_manager()
buf = mgr.get_or_create("sess-compact")
assert len(buf.messages) == 2
def test_ingest_batch_uses_same_logic(self, service: MemoryService, ctx: RequestContext):
result = service.ingest_batch({
"sessionId": "sess-compact",
"messages": [{"role": "user", "content": "batch msg"}],
"_ctx": ctx,
})
assert result["ingested"] is True
assert result["count"] == 1
mgr = service.get_session_manager()
buf = mgr.get_or_create("sess-compact")
assert len(buf.messages) == 1
class TestAfterTurnDedup:
"""after_turn() should skip messages already written by ingest()."""
def test_after_turn_skips_ingest_written_user_message(self, service: MemoryService, ctx: RequestContext):
service.ingest({
"sessionId": "sess-compact",
"messages": [{"role": "user", "content": "hello from user"}],
"_ctx": ctx,
})
mgr = service.get_session_manager()
buf = mgr.get_or_create("sess-compact")
assert len(buf.messages) == 1
result = service.after_turn({
"sessionId": "sess-compact",
"messages": [
{"role": "user", "content": "hello from user"},
{"role": "assistant", "content": "hi from assistant"},
],
"_ctx": ctx,
})
assert result["ok"] is True
buf = mgr.get_or_create("sess-compact")
assert len(buf.messages) == 2
assert buf.messages[0].role == "user"
assert buf.messages[0].content == "hello from user"
assert buf.messages[1].role == "assistant"
assert buf.messages[1].content == "hi from assistant"
def test_after_turn_no_dedup_when_ingest_not_called(self, service: MemoryService, ctx: RequestContext):
result = service.after_turn({
"sessionId": "sess-compact",
"messages": [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "world"},
],
"_ctx": ctx,
})
assert result["ok"] is True
mgr = service.get_session_manager()
buf = mgr.get_or_create("sess-compact")
assert len(buf.messages) == 2
def test_ingest_then_compact_finds_buffer(self, service: MemoryService, ctx: RequestContext):
"""compact() should find messages written by ingest() — the fix for empty_buffer."""
service.ingest({
"sessionId": "sess-compact",
"messages": [{"role": "user", "content": "important data"}],
"_ctx": ctx,
})
mgr = service.get_session_manager()
buf = mgr.get_or_create("sess-compact")
result = mgr.commit("sess-compact", ctx, wait=True)
assert result["ok"] is True
assert result["archived"] is True
assert result.get("reason") != "empty_buffer"
def test_openclaw_plugin_manifest_exposes_compaction_config_schema():
repo_root = Path(__file__).resolve().parents[3]
manifest_path = repo_root / "openclaw_context_engine_plugin" / "openclaw.plugin.json"
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
props = manifest["configSchema"]["properties"]
assert props["compactTakeoverEnabled"]["type"] == "boolean"
assert props["compactTakeoverEnabled"]["default"] is True
assert props["summaryMaxChars"]["type"] == "integer"
assert props["summaryMaxChars"]["default"] == 4000
assert props["shortTermIndexMode"]["enum"] == ["sync", "async", "off"]
assert props["shortTermIndexMode"]["default"] == "sync"