"""Tests for agent/context_compressor.py — compression logic, thresholds, truncation fallback."""
import pytest
from unittest.mock import patch, MagicMock
from agent.context_compressor import ContextCompressor, SUMMARY_PREFIX
@pytest.fixture()
def compressor():
"""Create a ContextCompressor with mocked dependencies."""
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(
model="test/model",
threshold_percent=0.85,
protect_first_n=2,
protect_last_n=2,
quiet_mode=True,
)
return c
class TestShouldCompress:
def test_below_threshold(self, compressor):
compressor.last_prompt_tokens = 50000
assert compressor.should_compress() is False
def test_above_threshold(self, compressor):
compressor.last_prompt_tokens = 90000
assert compressor.should_compress() is True
def test_exact_threshold(self, compressor):
compressor.last_prompt_tokens = 85000
assert compressor.should_compress() is True
def test_explicit_tokens(self, compressor):
assert compressor.should_compress(prompt_tokens=90000) is True
assert compressor.should_compress(prompt_tokens=50000) is False
class TestUpdateFromResponse:
def test_updates_fields(self, compressor):
compressor.update_from_response({
"prompt_tokens": 5000,
"completion_tokens": 1000,
"total_tokens": 6000,
})
assert compressor.last_prompt_tokens == 5000
assert compressor.last_completion_tokens == 1000
def test_missing_fields_default_zero(self, compressor):
compressor.update_from_response({})
assert compressor.last_prompt_tokens == 0
class TestCompress:
def _make_messages(self, n):
return [{"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"} for i in range(n)]
def test_too_few_messages_returns_unchanged(self, compressor):
msgs = self._make_messages(4)
result = compressor.compress(msgs)
assert result == msgs
def test_truncation_fallback_no_client(self, compressor):
msgs = [{"role": "system", "content": "System prompt"}] + self._make_messages(10)
result = compressor.compress(msgs)
assert len(result) < len(msgs)
assert result[0]["role"] == "system"
assert compressor.compression_count == 1
assert compressor._last_compress_aborted is False
assert compressor._last_summary_fallback_used is True
def test_compression_increments_count(self, compressor):
msgs = self._make_messages(10)
compressor.compress(msgs)
assert compressor.compression_count == 1
compressor.compress(msgs)
assert compressor.compression_count == 2
def test_protects_first_and_last(self, compressor):
msgs = self._make_messages(10)
result = compressor.compress(msgs)
assert result[-1]["content"] == msgs[-1]["content"]
assert msgs[-2]["content"] in result[-2]["content"]
class TestGenerateSummaryNoneContent:
"""Regression: content=None (from tool-call-only assistant messages) must not crash."""
def test_none_content_does_not_crash(self):
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: tool calls happened"
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(model="test", quiet_mode=True)
messages = [
{"role": "user", "content": "do something"},
{"role": "assistant", "content": None, "tool_calls": [
{"function": {"name": "search"}}
]},
{"role": "tool", "content": "result"},
{"role": "assistant", "content": None},
{"role": "user", "content": "thanks"},
]
with patch("agent.context_compressor.call_llm", return_value=mock_response):
summary = c._generate_summary(messages)
assert isinstance(summary, str)
assert summary.startswith(SUMMARY_PREFIX)
def test_none_content_in_system_message_compress(self):
"""System message with content=None should not crash during compress."""
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
msgs = [{"role": "system", "content": None}] + [
{"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"}
for i in range(10)
]
result = c.compress(msgs)
assert len(result) < len(msgs)
class TestNonStringContent:
"""Regression: content as dict (e.g., llama.cpp tool calls) must not crash."""
def test_dict_content_coerced_to_string(self):
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = {"text": "some summary"}
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(model="test", quiet_mode=True)
messages = [
{"role": "user", "content": "do something"},
{"role": "assistant", "content": "ok"},
]
with patch("agent.context_compressor.call_llm", return_value=mock_response):
summary = c._generate_summary(messages)
assert isinstance(summary, str)
assert summary.startswith(SUMMARY_PREFIX)
def test_none_content_coerced_to_empty(self):
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = None
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(model="test", quiet_mode=True)
messages = [
{"role": "user", "content": "do something"},
{"role": "assistant", "content": "ok"},
]
with patch("agent.context_compressor.call_llm", return_value=mock_response):
summary = c._generate_summary(messages)
assert summary is not None
assert summary == SUMMARY_PREFIX
def test_summary_call_does_not_force_temperature(self):
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "ok"
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(model="test", quiet_mode=True)
messages = [
{"role": "user", "content": "do something"},
{"role": "assistant", "content": "ok"},
]
with patch("agent.context_compressor.call_llm", return_value=mock_response) as mock_call:
c._generate_summary(messages)
kwargs = mock_call.call_args.kwargs
assert "temperature" not in kwargs
def test_summary_prompt_avoids_filter_sensitive_handoff_framing(self):
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "ok"
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(model="test", quiet_mode=True)
messages = [
{"role": "user", "content": "do something"},
{"role": "assistant", "content": "ok"},
]
with patch("agent.context_compressor.call_llm", return_value=mock_response) as mock_call:
c._generate_summary(messages)
prompt = mock_call.call_args.kwargs["messages"][0]["content"]
assert "Your output will be injected" not in prompt
assert "Do NOT respond" not in prompt
assert "DIFFERENT assistant" not in prompt
assert "different assistant" not in prompt
assert "Treat the conversation turns below as source material" in prompt
assert "structured checkpoint summary" in prompt
def test_summary_call_passes_live_main_runtime(self):
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "ok"
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(
model="gpt-5.4",
provider="openai-codex",
base_url="https://chatgpt.com/backend-api/codex",
api_key="codex-token",
api_mode="codex_responses",
quiet_mode=True,
)
messages = [
{"role": "user", "content": "do something"},
{"role": "assistant", "content": "ok"},
]
with patch("agent.context_compressor.call_llm", return_value=mock_response) as mock_call:
c._generate_summary(messages)
assert mock_call.call_args.kwargs["main_runtime"] == {
"model": "gpt-5.4",
"provider": "openai-codex",
"base_url": "https://chatgpt.com/backend-api/codex",
"api_key": "codex-token",
"api_mode": "codex_responses",
}
class TestSummaryFailureCooldown:
def test_summary_failure_enters_cooldown_and_skips_retry(self):
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(model="test", quiet_mode=True)
messages = [
{"role": "user", "content": "do something"},
{"role": "assistant", "content": "ok"},
]
with patch("agent.context_compressor.call_llm", side_effect=Exception("boom")) as mock_call:
first = c._generate_summary(messages)
second = c._generate_summary(messages)
assert first is None
assert second is None
assert mock_call.call_count == 1
class TestSummaryFallbackToMainModel:
"""When ``summary_model`` differs from the main model and the summary LLM
call fails, the compressor should retry once on the main model before
giving up — losing N turns of context is almost always worse than one
extra summary attempt. Covers both the fast-path (explicit
model-not-found errors) and the unknown-error best-effort retry."""
def _msgs(self):
return [
{"role": "user", "content": "do something"},
{"role": "assistant", "content": "ok"},
]
def test_model_not_found_404_falls_back_to_main_and_succeeds(self):
"""Classic misconfiguration: ``auxiliary.compression.model`` points at
a model the main provider doesn't serve → 404 → retry on main."""
mock_ok = MagicMock()
mock_ok.choices = [MagicMock()]
mock_ok.choices[0].message.content = "summary via main model"
err_404 = Exception("404 model_not_found: no such model")
err_404.status_code = 404
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(
model="main-model",
summary_model_override="broken-aux-model",
quiet_mode=True,
)
with patch(
"agent.context_compressor.call_llm",
side_effect=[err_404, mock_ok],
) as mock_call:
result = c._generate_summary(self._msgs())
assert mock_call.call_count == 2
assert mock_call.call_args_list[0].kwargs.get("model") == "broken-aux-model"
assert "model" not in mock_call.call_args_list[1].kwargs
assert result is not None
assert "summary via main model" in result
assert c._last_aux_model_failure_model == "broken-aux-model"
assert c._last_aux_model_failure_error is not None
assert "404" in c._last_aux_model_failure_error
def test_unknown_error_falls_back_to_main_and_succeeds(self):
"""Errors that don't match the 404/503/model_not_found fast-path
(400s, provider-specific 'no route', aggregator rejections) should
ALSO trigger a best-effort retry on main before entering cooldown."""
mock_ok = MagicMock()
mock_ok.choices = [MagicMock()]
mock_ok.choices[0].message.content = "summary via main model"
err_400 = Exception("400 Bad Request: provider rejected model")
err_400.status_code = 400
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(
model="main-model",
summary_model_override="broken-aux-model",
quiet_mode=True,
)
with patch(
"agent.context_compressor.call_llm",
side_effect=[err_400, mock_ok],
) as mock_call:
result = c._generate_summary(self._msgs())
assert mock_call.call_count == 2
assert mock_call.call_args_list[0].kwargs.get("model") == "broken-aux-model"
assert "model" not in mock_call.call_args_list[1].kwargs
assert result is not None
assert "summary via main model" in result
assert c._last_aux_model_failure_model == "broken-aux-model"
assert c._last_aux_model_failure_error is not None
assert "400" in c._last_aux_model_failure_error
def test_no_fallback_when_summary_model_equals_main_model(self):
"""If the aux model IS the main model, there's nowhere to fall back
to — go straight to cooldown, don't loop retrying the same call."""
err = Exception("500 internal error")
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(
model="main-model",
summary_model_override="main-model",
quiet_mode=True,
)
with patch(
"agent.context_compressor.call_llm",
side_effect=err,
) as mock_call:
result = c._generate_summary(self._msgs())
assert mock_call.call_count == 1
assert result is None
assert getattr(c, "_summary_model_fallen_back", False) is False
def test_fallback_only_happens_once_per_compressor(self):
"""If the retry-on-main ALSO fails, don't loop forever — enter
cooldown like the normal failure path."""
err1 = Exception("400 aux model rejected")
err2 = Exception("500 main model also exploded")
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(
model="main-model",
summary_model_override="broken-aux-model",
quiet_mode=True,
)
with patch(
"agent.context_compressor.call_llm",
side_effect=[err1, err2],
) as mock_call:
result = c._generate_summary(self._msgs())
assert mock_call.call_count == 2
assert result is None
assert c._summary_model_fallen_back is True
def test_json_decode_error_falls_back_to_main_and_succeeds(self):
"""JSONDecodeError from the OpenAI SDK's ``response.json()`` (raised
when a misconfigured proxy returns HTML/plain-text with
``Content-Type: application/json``) should trigger the same
retry-on-main path as 404/timeout. Issue #22244."""
import json as _json
mock_ok = MagicMock()
mock_ok.choices = [MagicMock()]
mock_ok.choices[0].message.content = "summary via main model"
err_json = _json.JSONDecodeError(
"Expecting value", "<!DOCTYPE html><html>...</html>", 0
)
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(
model="main-model",
summary_model_override="aux-via-broken-proxy",
quiet_mode=True,
)
with patch(
"agent.context_compressor.call_llm",
side_effect=[err_json, mock_ok],
) as mock_call:
result = c._generate_summary(self._msgs())
assert mock_call.call_count == 2
assert mock_call.call_args_list[0].kwargs.get("model") == "aux-via-broken-proxy"
assert "model" not in mock_call.call_args_list[1].kwargs
assert result is not None
assert "summary via main model" in result
assert c._last_aux_model_failure_model == "aux-via-broken-proxy"
assert c._last_aux_model_failure_error is not None
assert len(c._last_aux_model_failure_error) <= 220
def test_json_decode_error_substring_match_in_wrapped_exception(self):
"""When the OpenAI SDK wraps the raw JSONDecodeError inside its own
``APIResponseValidationError`` (or similar), ``isinstance`` no longer
matches but the substring "expecting value" still appears in
``str(e)``. We detect this case by string match and fall back the
same way."""
mock_ok = MagicMock()
mock_ok.choices = [MagicMock()]
mock_ok.choices[0].message.content = "summary via main model"
err_wrapped = Exception("Expecting value: line 1 column 1 (char 0)")
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(
model="main-model",
summary_model_override="aux-model",
quiet_mode=True,
)
with patch(
"agent.context_compressor.call_llm",
side_effect=[err_wrapped, mock_ok],
) as mock_call:
result = c._generate_summary(self._msgs())
assert mock_call.call_count == 2
assert result is not None
assert "summary via main model" in result
def test_json_decode_error_on_main_uses_short_cooldown(self):
"""When already on the main model (no separate summary_model, or
fallback already happened), a JSONDecodeError should set the short
30s cooldown, not the default 60s — provider bodies tend to
recover quickly when an upstream proxy comes back online."""
import json as _json
err_json = _json.JSONDecodeError("Expecting value", "<html/>", 0)
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(
model="main-model",
quiet_mode=True,
)
with patch(
"agent.context_compressor.call_llm",
side_effect=err_json,
), patch("agent.context_compressor.time.monotonic", return_value=1000.0):
result = c._generate_summary(self._msgs())
assert result is None
assert c._summary_failure_cooldown_until == 1030.0
class TestStreamingClosedFallback:
"""httpcore / httpx streaming premature-close errors must be classified the
same as timeouts so the compressor retries on the main model instead of
entering a 60-second cooldown. Issue #18458.
``_is_connection_error`` is patched here because the test venv may not
have ``openai`` installed (the real function does ``from openai import ...``
inside its body). We test the *wiring* — that `_generate_summary` calls
``_is_connection_error`` and acts on its result — not the classifier itself
(that's covered in ``test_auxiliary_client.py::TestIsConnectionError``).
"""
def _msgs(self):
return [
{"role": "user", "content": "do something"},
{"role": "assistant", "content": "ok"},
]
def test_incomplete_chunked_read_falls_back_to_main(self):
"""``httpcore.RemoteProtocolError: incomplete chunked read`` triggers
the retry-on-main path when ``_is_connection_error`` returns True."""
mock_ok = MagicMock()
mock_ok.choices = [MagicMock()]
mock_ok.choices[0].message.content = "summary via main model"
err = Exception("RemoteProtocolError: incomplete chunked read")
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(
model="main-model",
summary_model_override="aux-stream-model",
quiet_mode=True,
)
with patch(
"agent.context_compressor.call_llm",
side_effect=[err, mock_ok],
) as mock_call, patch(
"agent.context_compressor._is_connection_error",
return_value=True,
):
result = c._generate_summary(self._msgs())
assert mock_call.call_count == 2
assert mock_call.call_args_list[0].kwargs.get("model") == "aux-stream-model"
assert "model" not in mock_call.call_args_list[1].kwargs
assert result is not None
assert "summary via main model" in result
def test_peer_closed_connection_falls_back_to_main(self):
"""``peer closed connection`` triggers the retry-on-main path."""
mock_ok = MagicMock()
mock_ok.choices = [MagicMock()]
mock_ok.choices[0].message.content = "summary ok"
err = Exception("peer closed connection without sending complete message body")
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(
model="main-model",
summary_model_override="aux-model",
quiet_mode=True,
)
with patch(
"agent.context_compressor.call_llm",
side_effect=[err, mock_ok],
) as mock_call, patch(
"agent.context_compressor._is_connection_error",
return_value=True,
):
result = c._generate_summary(self._msgs())
assert mock_call.call_count == 2
assert result is not None
def test_streaming_closed_on_main_uses_short_cooldown(self):
"""When already on the main model, a streaming-closed error should use
the 30s cooldown, not the default 60s — these errors are transient."""
err = Exception("RemoteProtocolError: response ended prematurely")
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(
model="main-model",
quiet_mode=True,
)
with patch(
"agent.context_compressor.call_llm",
side_effect=err,
), patch(
"agent.context_compressor._is_connection_error",
return_value=True,
), patch("agent.context_compressor.time.monotonic", return_value=1000.0):
result = c._generate_summary(self._msgs())
assert result is None
assert c._summary_failure_cooldown_until == 1030.0
def test_non_streaming_unknown_error_still_uses_long_cooldown(self):
"""Unclassified errors should retain the 60s default cooldown to
prevent hammering a broken provider."""
err = Exception("Internal Server Error: something unexpected happened")
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(
model="main-model",
quiet_mode=True,
)
with patch(
"agent.context_compressor.call_llm",
side_effect=err,
), patch(
"agent.context_compressor._is_connection_error",
return_value=False,
), patch("agent.context_compressor.time.monotonic", return_value=1000.0):
result = c._generate_summary(self._msgs())
assert result is None
assert c._summary_failure_cooldown_until == 1060.0
class TestAuxModelFallbackSurfacedToCallers:
"""When summary_model fails but retry-on-main succeeds, compress() must
expose the aux-model failure via _last_aux_model_failure_{model,error}
so gateway /compress and CLI callers can warn the user about their
broken auxiliary.compression.model config — silent recovery would hide
a misconfiguration only the user can fix."""
def _make_msgs(self):
return [
{"role": "system", "content": "sys"},
{"role": "user", "content": "msg 1"},
{"role": "assistant", "content": "msg 2"},
{"role": "user", "content": "msg 3"},
{"role": "assistant", "content": "msg 4"},
{"role": "user", "content": "msg 5"},
{"role": "assistant", "content": "msg 6"},
{"role": "user", "content": "msg 7"},
]
def test_compress_exposes_aux_failure_fields_after_successful_fallback(self):
mock_ok = MagicMock()
mock_ok.choices = [MagicMock()]
mock_ok.choices[0].message.content = "summary via main"
err_400 = Exception("400 provider rejected configured model")
err_400.status_code = 400
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(
model="main-model",
summary_model_override="broken-aux-model",
quiet_mode=True,
protect_first_n=2,
protect_last_n=2,
)
with patch(
"agent.context_compressor.call_llm",
side_effect=[err_400, mock_ok],
):
result = c.compress(self._make_msgs())
assert c._last_summary_fallback_used is False
assert c._last_aux_model_failure_model == "broken-aux-model"
assert c._last_aux_model_failure_error is not None
assert "400" in c._last_aux_model_failure_error
assert any(
isinstance(m.get("content"), str) and "summary via main" in m["content"]
for m in result
)
def test_compress_clears_aux_failure_fields_at_start_of_next_call(self):
"""A subsequent successful compression must clear the aux-failure
fields so the warning doesn't persist forever."""
mock_ok = MagicMock()
mock_ok.choices = [MagicMock()]
mock_ok.choices[0].message.content = "summary via main"
err_400 = Exception("400 aux model busted")
err_400.status_code = 400
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(
model="main-model",
summary_model_override="broken-aux-model",
quiet_mode=True,
protect_first_n=2,
protect_last_n=2,
)
with patch(
"agent.context_compressor.call_llm",
side_effect=[err_400, mock_ok],
):
c.compress(self._make_msgs())
assert c._last_aux_model_failure_model == "broken-aux-model"
with patch(
"agent.context_compressor.call_llm",
return_value=mock_ok,
):
c.compress(self._make_msgs())
assert c._last_aux_model_failure_model is None
assert c._last_aux_model_failure_error is None
class TestSummaryFailureTrackingForGatewayWarning:
"""Default behavior (compression.abort_on_summary_failure=False):
summary-generation failure inserts a static fallback placeholder and
records dropped count + fallback flag so gateway hygiene & /compress
can surface a visible warning."""
def test_compress_records_fallback_and_dropped_count_on_summary_failure(self):
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
msgs = [
{"role": "system", "content": "sys"},
{"role": "user", "content": "msg 1"},
{"role": "assistant", "content": "msg 2"},
{"role": "user", "content": "msg 3"},
{"role": "assistant", "content": "msg 4"},
{"role": "user", "content": "msg 5"},
{"role": "assistant", "content": "msg 6"},
{"role": "user", "content": "msg 7"},
]
with patch("agent.context_compressor.call_llm", side_effect=Exception("404 model not found")):
result = c.compress(msgs)
assert c._last_summary_fallback_used is True
assert c._last_summary_dropped_count > 0
assert c._last_summary_error is not None
assert c._last_compress_aborted is False
assert any(
isinstance(m.get("content"), str) and "Summary generation was unavailable" in m["content"]
for m in result
)
def test_compress_clears_fallback_flag_on_subsequent_success(self):
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "summary text"
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
msgs = [
{"role": "system", "content": "sys"},
{"role": "user", "content": "msg 1"},
{"role": "assistant", "content": "msg 2"},
{"role": "user", "content": "msg 3"},
{"role": "assistant", "content": "msg 4"},
{"role": "user", "content": "msg 5"},
{"role": "assistant", "content": "msg 6"},
{"role": "user", "content": "msg 7"},
]
with patch("agent.context_compressor.call_llm", side_effect=Exception("boom")):
c.compress(msgs)
assert c._last_summary_fallback_used is True
c._summary_failure_cooldown_until = 0.0
with patch("agent.context_compressor.call_llm", return_value=mock_response):
c.compress(msgs)
assert c._last_summary_fallback_used is False
assert c._last_summary_dropped_count == 0
class TestAbortOnSummaryFailure:
"""Opt-in behavior (compression.abort_on_summary_failure=True):
summary-generation failure ABORTS compression entirely — returns the
original messages unchanged and sets _last_compress_aborted=True so
gateway hygiene & /compress can surface a visible warning."""
def _make_msgs(self):
return [
{"role": "system", "content": "sys"},
{"role": "user", "content": "msg 1"},
{"role": "assistant", "content": "msg 2"},
{"role": "user", "content": "msg 3"},
{"role": "assistant", "content": "msg 4"},
{"role": "user", "content": "msg 5"},
{"role": "assistant", "content": "msg 6"},
{"role": "user", "content": "msg 7"},
]
def _make_compressor(self):
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
return ContextCompressor(
model="test",
quiet_mode=True,
protect_first_n=2,
protect_last_n=2,
abort_on_summary_failure=True,
)
def test_compress_aborts_and_preserves_messages_on_summary_failure(self):
c = self._make_compressor()
msgs = self._make_msgs()
with patch("agent.context_compressor.call_llm", side_effect=Exception("404 model not found")):
result = c.compress(msgs)
assert c._last_compress_aborted is True
assert c._last_summary_error is not None
assert c._last_summary_fallback_used is False
assert c._last_summary_dropped_count == 0
assert result == msgs
assert not any(
isinstance(m.get("content"), str) and "Summary generation was unavailable" in m["content"]
for m in result
)
def test_compress_clears_abort_flag_on_subsequent_success(self):
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "summary text"
c = self._make_compressor()
msgs = self._make_msgs()
with patch("agent.context_compressor.call_llm", side_effect=Exception("boom")):
c.compress(msgs)
assert c._last_compress_aborted is True
c._summary_failure_cooldown_until = 0.0
with patch("agent.context_compressor.call_llm", return_value=mock_response):
c.compress(msgs)
assert c._last_compress_aborted is False
assert c._last_summary_fallback_used is False
assert c._last_summary_dropped_count == 0
def test_force_true_bypasses_failure_cooldown(self):
"""Manual /compress passes force=True so it can retry immediately
after an auto-compress abort instead of waiting out the 30-60s
cooldown."""
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "summary text"
c = self._make_compressor()
msgs = self._make_msgs()
import time as _time
c._summary_failure_cooldown_until = _time.monotonic() + 999.0
with patch("agent.context_compressor.call_llm", return_value=mock_response):
result = c.compress(msgs, force=True)
assert c._last_compress_aborted is False
assert c._summary_failure_cooldown_until == 0.0
assert len(result) < len(msgs)
class TestSummaryPrefixNormalization:
def test_legacy_prefix_is_replaced(self):
summary = ContextCompressor._with_summary_prefix("[CONTEXT SUMMARY]: did work")
assert summary == f"{SUMMARY_PREFIX}\ndid work"
def test_existing_new_prefix_is_not_duplicated(self):
summary = ContextCompressor._with_summary_prefix(f"{SUMMARY_PREFIX}\ndid work")
assert summary == f"{SUMMARY_PREFIX}\ndid work"
class TestCompressWithClient:
def test_system_content_list_gets_compression_note_without_crashing(self):
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "summary text"
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
msgs = [
{"role": "system", "content": [{"type": "text", "text": "system prompt"}]},
{"role": "user", "content": "msg 1"},
{"role": "assistant", "content": "msg 2"},
{"role": "user", "content": "msg 3"},
{"role": "assistant", "content": "msg 4"},
{"role": "user", "content": "msg 5"},
{"role": "assistant", "content": "msg 6"},
{"role": "user", "content": "msg 7"},
]
with patch("agent.context_compressor.call_llm", return_value=mock_response):
result = c.compress(msgs)
assert isinstance(result[0]["content"], list)
assert any(
isinstance(block, dict)
and "compacted into a handoff summary" in block.get("text", "")
for block in result[0]["content"]
)
def test_summarization_path(self):
mock_client = MagicMock()
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: stuff happened"
mock_client.chat.completions.create.return_value = mock_response
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
msgs = [{"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"} for i in range(10)]
with patch("agent.context_compressor.call_llm", return_value=mock_response):
result = c.compress(msgs)
contents = [m.get("content", "") for m in result]
assert any(c.startswith(SUMMARY_PREFIX) for c in contents)
assert len(result) < len(msgs)
def test_summarization_does_not_split_tool_call_pairs(self):
mock_client = MagicMock()
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: compressed middle"
mock_client.chat.completions.create.return_value = mock_response
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(
model="test",
quiet_mode=True,
protect_first_n=3,
protect_last_n=4,
)
msgs = [
{"role": "user", "content": "Could you address the reviewer comments in PR#71"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{"id": "call_a", "type": "function", "function": {"name": "skill_view", "arguments": "{}"}},
{"id": "call_b", "type": "function", "function": {"name": "skill_view", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": "call_a", "content": "output a"},
{"role": "tool", "tool_call_id": "call_b", "content": "output b"},
{"role": "user", "content": "later 1"},
{"role": "assistant", "content": "later 2"},
{"role": "tool", "tool_call_id": "call_x", "content": "later output"},
{"role": "assistant", "content": "later 3"},
{"role": "user", "content": "later 4"},
]
with patch("agent.context_compressor.call_llm", return_value=mock_response):
result = c.compress(msgs)
answered_ids = {
msg.get("tool_call_id")
for msg in result
if msg.get("role") == "tool" and msg.get("tool_call_id")
}
for msg in result:
if msg.get("role") == "assistant" and msg.get("tool_calls"):
for tc in msg["tool_calls"]:
assert tc["id"] in answered_ids
def test_sanitizer_matches_responses_call_id_when_id_differs(self, compressor):
msgs = [
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "fc_123",
"call_id": "call_123",
"response_item_id": "fc_123",
"type": "function",
"function": {"name": "search_files", "arguments": "{}"},
}
],
},
{"role": "tool", "tool_call_id": "call_123", "content": "result"},
]
sanitized = compressor._sanitize_tool_pairs(msgs)
assert [m.get("tool_call_id") for m in sanitized if m.get("role") == "tool"] == [
"call_123"
]
def test_user_role_summary_carries_end_marker(self):
"""When the summary lands as standalone role='user' (e.g. head ends
with assistant/tool), the message body must include the explicit
'--- END OF CONTEXT SUMMARY ---' marker. Without it, weak models
read the verbatim past user request quoted in '## Active Task' as
fresh input (#11475, #14521).
"""
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "summary text"
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
msgs = [
{"role": "user", "content": "msg 0"},
{"role": "assistant", "content": "msg 1"},
{"role": "user", "content": "msg 2"},
{"role": "assistant", "content": "msg 3"},
{"role": "user", "content": "msg 4"},
{"role": "assistant", "content": "msg 5"},
{"role": "user", "content": "msg 6"},
{"role": "assistant", "content": "msg 7"},
]
with patch("agent.context_compressor.call_llm", return_value=mock_response):
result = c.compress(msgs)
summary_msg = next(
m for m in result if (m.get("content") or "").startswith(SUMMARY_PREFIX)
)
assert summary_msg["role"] == "user"
assert "END OF CONTEXT SUMMARY" in summary_msg["content"]
assert summary_msg["content"].rstrip().endswith(
"respond to the message below, not the summary above ---"
)
def test_summary_role_avoids_consecutive_user_messages(self):
"""Summary role should alternate with the last head message to avoid consecutive same-role messages."""
mock_client = MagicMock()
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: stuff happened"
mock_client.chat.completions.create.return_value = mock_response
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
msgs = [
{"role": "user", "content": "msg 0"},
{"role": "assistant", "content": "msg 1"},
{"role": "user", "content": "msg 2"},
{"role": "assistant", "content": "msg 3"},
{"role": "user", "content": "msg 4"},
{"role": "assistant", "content": "msg 5"},
{"role": "user", "content": "msg 6"},
{"role": "assistant", "content": "msg 7"},
]
with patch("agent.context_compressor.call_llm", return_value=mock_response):
result = c.compress(msgs)
summary_msg = [
m for m in result if (m.get("content") or "").startswith(SUMMARY_PREFIX)
]
assert len(summary_msg) == 1
assert summary_msg[0]["role"] == "user"
def test_summary_role_avoids_consecutive_user_when_head_ends_with_user(self):
"""When last head message is 'user', summary must be 'assistant' to avoid two consecutive user messages."""
mock_client = MagicMock()
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: stuff happened"
mock_client.chat.completions.create.return_value = mock_response
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
msgs = [
{"role": "system", "content": "system prompt"},
{"role": "user", "content": "msg 1"},
{"role": "user", "content": "msg 2"},
{"role": "assistant", "content": "msg 3"},
{"role": "user", "content": "msg 4"},
{"role": "assistant", "content": "msg 5"},
{"role": "user", "content": "msg 6"},
{"role": "assistant", "content": "msg 7"},
]
with patch("agent.context_compressor.call_llm", return_value=mock_response):
result = c.compress(msgs)
summary_msg = [
m for m in result if (m.get("content") or "").startswith(SUMMARY_PREFIX)
]
assert len(summary_msg) == 1
assert summary_msg[0]["role"] == "assistant"
def test_summary_role_flips_to_avoid_tail_collision(self):
"""When summary role collides with the first tail message but flipping
doesn't collide with head, the role should be flipped."""
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "summary text"
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
msgs = [
{"role": "user", "content": "msg 0"},
{"role": "assistant", "content": "", "tool_calls": [
{"id": "call_1", "type": "function", "function": {"name": "t", "arguments": "{}"}},
]},
{"role": "tool", "tool_call_id": "call_1", "content": "result 1"},
{"role": "assistant", "content": "msg 3"},
{"role": "user", "content": "msg 4"},
{"role": "assistant", "content": "msg 5"},
{"role": "user", "content": "msg 6"},
{"role": "assistant", "content": "msg 7"},
]
with patch("agent.context_compressor.call_llm", return_value=mock_response):
result = c.compress(msgs)
for i in range(1, len(result)):
r1 = result[i - 1].get("role")
r2 = result[i].get("role")
if r1 in {"user", "assistant"} and r2 in {"user", "assistant"}:
assert r1 != r2, f"consecutive {r1} at indices {i-1},{i}"
def test_double_collision_merges_summary_into_tail(self):
"""When neither role avoids collision with both neighbors, the summary
should be merged into the first tail message rather than creating a
standalone message that breaks role alternation.
Common scenario: head ends with 'assistant', tail starts with 'user'.
summary='user' collides with tail, summary='assistant' collides with head.
"""
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "summary text"
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=3)
msgs = [
{"role": "system", "content": "system prompt"},
{"role": "user", "content": "msg 1"},
{"role": "assistant", "content": "msg 2"},
{"role": "user", "content": "msg 3"},
{"role": "assistant", "content": "msg 4"},
{"role": "user", "content": "msg 5"},
{"role": "user", "content": "msg 6"},
{"role": "assistant", "content": "msg 7"},
{"role": "user", "content": "msg 8"},
]
with patch("agent.context_compressor.call_llm", return_value=mock_response):
result = c.compress(msgs)
for i in range(1, len(result)):
r1 = result[i - 1].get("role")
r2 = result[i].get("role")
if r1 in {"user", "assistant"} and r2 in {"user", "assistant"}:
assert r1 != r2, f"consecutive {r1} at indices {i-1},{i}"
first_tail = [m for m in result if "msg 6" in (m.get("content") or "")]
assert len(first_tail) == 1
assert "summary text" in first_tail[0]["content"]
def test_double_collision_merges_summary_into_list_tail_content(self):
"""Structured tail content should accept a merged summary without TypeError."""
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "summary text"
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=3)
msgs = [
{"role": "system", "content": "system prompt"},
{"role": "user", "content": "msg 1"},
{"role": "assistant", "content": "msg 2"},
{"role": "user", "content": "msg 3"},
{"role": "assistant", "content": "msg 4"},
{"role": "user", "content": "msg 5"},
{"role": "user", "content": [{"type": "text", "text": "msg 6"}]},
{"role": "assistant", "content": "msg 7"},
{"role": "user", "content": "msg 8"},
]
with patch("agent.context_compressor.call_llm", return_value=mock_response):
result = c.compress(msgs)
merged_tail = next(
m for m in result
if m.get("role") == "user" and isinstance(m.get("content"), list)
)
assert isinstance(merged_tail["content"], list)
assert "summary text" in merged_tail["content"][0]["text"]
assert any(
isinstance(block, dict) and block.get("text") == "msg 6"
for block in merged_tail["content"]
)
def test_double_collision_user_head_assistant_tail(self):
"""Reverse double collision: head ends with 'user', tail starts with 'assistant'.
summary='assistant' collides with tail, 'user' collides with head → merge."""
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "summary text"
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=1, protect_last_n=2)
msgs = [
{"role": "system", "content": "system prompt"},
{"role": "user", "content": "msg 1"},
{"role": "assistant", "content": "msg 2"},
{"role": "user", "content": "msg 3"},
{"role": "assistant", "content": "msg 4"},
{"role": "assistant", "content": "msg 5"},
{"role": "user", "content": "msg 6"},
{"role": "assistant", "content": "msg 7"},
]
with patch("agent.context_compressor.call_llm", return_value=mock_response):
result = c.compress(msgs)
for i in range(1, len(result)):
r1 = result[i - 1].get("role")
r2 = result[i].get("role")
if r1 in {"user", "assistant"} and r2 in {"user", "assistant"}:
assert r1 != r2, f"consecutive {r1} at indices {i-1},{i}"
first_tail = [m for m in result if "msg 5" in (m.get("content") or "")]
assert len(first_tail) == 1
assert "summary text" in first_tail[0]["content"]
def test_no_collision_scenarios_still_work(self):
"""Verify that the common no-collision cases (head=assistant/tail=assistant,
head=user/tail=user) still produce a standalone summary message."""
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "summary text"
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
msgs = [
{"role": "user", "content": "msg 0"},
{"role": "assistant", "content": "msg 1"},
{"role": "user", "content": "msg 2"},
{"role": "assistant", "content": "msg 3"},
{"role": "user", "content": "msg 4"},
{"role": "assistant", "content": "msg 5"},
{"role": "user", "content": "msg 6"},
{"role": "assistant", "content": "msg 7"},
]
with patch("agent.context_compressor.call_llm", return_value=mock_response):
result = c.compress(msgs)
summary_msgs = [m for m in result if (m.get("content") or "").startswith(SUMMARY_PREFIX)]
assert len(summary_msgs) == 1, "should have a standalone summary message"
assert summary_msgs[0]["role"] == "user"
def test_summarization_does_not_start_tail_with_tool_outputs(self):
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: compressed middle"
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(
model="test",
quiet_mode=True,
protect_first_n=2,
protect_last_n=3,
)
msgs = [
{"role": "user", "content": "earlier 1"},
{"role": "assistant", "content": "earlier 2"},
{"role": "user", "content": "earlier 3"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{"id": "call_c", "type": "function", "function": {"name": "search_files", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": "call_c", "content": "output c"},
{"role": "user", "content": "latest user"},
]
with patch("agent.context_compressor.call_llm", return_value=mock_response):
result = c.compress(msgs)
called_ids = {
tc["id"]
for msg in result
if msg.get("role") == "assistant" and msg.get("tool_calls")
for tc in msg["tool_calls"]
}
for msg in result:
if msg.get("role") == "tool" and msg.get("tool_call_id"):
assert msg["tool_call_id"] in called_ids
class TestSummaryTargetRatio:
"""Verify that summary_target_ratio properly scales budgets with context window."""
def test_tail_budget_scales_with_context(self):
"""Tail token budget should be threshold_tokens * summary_target_ratio."""
with patch("agent.context_compressor.get_model_context_length", return_value=200_000):
c = ContextCompressor(model="test", quiet_mode=True, summary_target_ratio=0.40)
assert c.tail_token_budget == 40_000
with patch("agent.context_compressor.get_model_context_length", return_value=1_000_000):
c = ContextCompressor(model="test", quiet_mode=True, summary_target_ratio=0.40)
assert c.tail_token_budget == 200_000
def test_summary_cap_scales_with_context(self):
"""Max summary tokens should be 5% of context, capped at 12K."""
with patch("agent.context_compressor.get_model_context_length", return_value=200_000):
c = ContextCompressor(model="test", quiet_mode=True)
assert c.max_summary_tokens == 10_000
with patch("agent.context_compressor.get_model_context_length", return_value=1_000_000):
c = ContextCompressor(model="test", quiet_mode=True)
assert c.max_summary_tokens == 12_000
def test_ratio_clamped(self):
"""Ratio should be clamped to [0.10, 0.80]."""
with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
c = ContextCompressor(model="test", quiet_mode=True, summary_target_ratio=0.05)
assert c.summary_target_ratio == 0.10
with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
c = ContextCompressor(model="test", quiet_mode=True, summary_target_ratio=0.95)
assert c.summary_target_ratio == 0.80
def test_default_threshold_is_50_percent(self):
"""Default compression threshold should be 50%, with a 64K floor."""
with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
c = ContextCompressor(model="test", quiet_mode=True)
assert c.threshold_percent == 0.50
assert c.threshold_tokens == 64_000
def test_threshold_floor_does_not_apply_above_128k(self):
"""On large-context models the 50% percentage is used directly."""
with patch("agent.context_compressor.get_model_context_length", return_value=200_000):
c = ContextCompressor(model="test", quiet_mode=True)
assert c.threshold_tokens == 100_000
def test_default_protect_last_n_is_20(self):
"""Default protect_last_n should be 20."""
with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
c = ContextCompressor(model="test", quiet_mode=True)
assert c.protect_last_n == 20
def test_default_protect_first_n_is_3(self):
"""Default protect_first_n is 3 (system + 3 extra non-system messages =
4 protected messages total when a system prompt is present). With the
new semantics, the constructor default is 3 — the system prompt is
always implicitly protected ON TOP OF protect_first_n non-system
messages.
"""
with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
c = ContextCompressor(model="test", quiet_mode=True)
assert c.protect_first_n == 3
def test_protect_first_n_override(self):
"""protect_first_n=0 should be honoured — for users who rely on rolling
compaction and want NOTHING pinned at head except the system prompt
(always implicitly protected)."""
with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=0)
assert c.protect_first_n == 0
def test_protect_first_n_0_preserves_only_system_prompt(self):
"""End-to-end: when protect_first_n=0, compression should treat only
the system prompt as head. All user/assistant messages between the
system prompt and the protected tail become summarization candidates.
This is the cleanest configuration for long-running rolling-compaction
sessions — no user/assistant turn gets pinned verbatim forever just
because it happened to be early in the session."""
with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
c = ContextCompressor(
model="test",
quiet_mode=True,
protect_first_n=0,
protect_last_n=2,
)
msgs = (
[{"role": "system", "content": "System prompt"}]
+ [{"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"}
for i in range(8)]
)
result = c.compress(msgs)
assert result[0]["role"] == "system"
assert result[0]["content"].startswith("System prompt")
assert result[1].get("content") != "msg 0"
assert result[-1]["content"] == msgs[-1]["content"]
def test_protect_first_n_semantics_stable_without_system_prompt(self):
"""Regression: gateway /compress handler strips the system prompt
before calling compress(). protect_first_n must mean the same thing
in both paths — "N non-system head messages" — so configuring
protect_first_n=0 preserves NOTHING at the head regardless of whether
the system prompt is in the messages list.
Bug this covers: under the old semantics, protect_first_n counted
literally from messages[0]. In the gateway path (no system prompt)
that meant protect_first_n=1 would pin the first user turn of the
session forever — a user-reported complaint that a week-old
resolved question kept getting reinserted into every compaction
summary."""
with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
c = ContextCompressor(
model="test",
quiet_mode=True,
protect_first_n=0,
protect_last_n=2,
)
msgs = [
{"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"}
for i in range(10)
]
head_size = c._protect_head_size(msgs)
assert head_size == 0
c.protect_first_n = 3
assert c._protect_head_size(msgs) == 3
class TestTokenBudgetTailProtection:
"""Tests for token-budget-based tail protection (PR #6240).
The core change: tail protection is now based on a token budget rather
than a fixed message count. This prevents large tool outputs from
blocking compaction.
"""
@pytest.fixture()
def budget_compressor(self):
"""Compressor with known token budget for tail protection tests."""
with patch("agent.context_compressor.get_model_context_length", return_value=200_000):
c = ContextCompressor(
model="test/model",
threshold_percent=0.50,
protect_first_n=2,
protect_last_n=20,
quiet_mode=True,
)
return c
def test_large_tool_outputs_no_longer_block_compaction(self, budget_compressor):
"""The motivating scenario: 20 messages with large tool outputs should
NOT prevent compaction. With message-count tail protection they would
all be protected, leaving nothing to summarize."""
c = budget_compressor
messages = [
{"role": "user", "content": "Start task"},
{"role": "assistant", "content": "On it"},
]
for i in range(10):
messages.append({
"role": "assistant", "content": None,
"tool_calls": [{"function": {"name": f"tool_{i}", "arguments": "{}"}}],
})
messages.append({
"role": "tool", "content": "x" * 5000,
"tool_call_id": f"call_{i}",
})
messages.append({"role": "user", "content": "What's the status?"})
messages.append({"role": "assistant", "content": "Here's what I found..."})
messages.append({"role": "user", "content": "Continue"})
head_end = c.protect_first_n
cut = c._find_tail_cut_by_tokens(messages, head_end)
tail_size = len(messages) - cut
assert tail_size < 20, f"Tail {tail_size} messages — large tool outputs are blocking compaction"
assert tail_size >= 3
def test_min_tail_always_3_messages(self, budget_compressor):
"""Even with a tiny token budget, at least 3 messages are protected."""
c = budget_compressor
c.tail_token_budget = 10
messages = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi"},
{"role": "user", "content": "do something"},
{"role": "assistant", "content": "working on it"},
{"role": "user", "content": "more work"},
{"role": "assistant", "content": "done"},
{"role": "user", "content": "thanks"},
]
head_end = 2
cut = c._find_tail_cut_by_tokens(messages, head_end)
tail_size = len(messages) - cut
assert tail_size >= 3, f"Tail is only {tail_size} messages, min should be 3"
def test_soft_ceiling_allows_oversized_message(self, budget_compressor):
"""The 1.5x soft ceiling allows an oversized message to be included
rather than splitting it."""
c = budget_compressor
c.tail_token_budget = 500
messages = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi"},
{"role": "user", "content": "read the file"},
{"role": "assistant", "content": "a" * 2400},
{"role": "user", "content": "short"},
{"role": "assistant", "content": "short reply"},
{"role": "user", "content": "continue"},
]
head_end = 2
cut = c._find_tail_cut_by_tokens(messages, head_end)
tail_size = len(messages) - cut
assert tail_size >= 3
def test_small_conversation_still_compresses(self, budget_compressor):
"""With the new min of 8 messages (head=2 + 3 + 1 guard + 2 middle),
a small but compressible conversation should still compress."""
c = budget_compressor
messages = []
for i in range(9):
role = "user" if i % 2 == 0 else "assistant"
messages.append({"role": role, "content": f"Message {i}"})
with patch.object(c, "_generate_summary", return_value="Summary of conversation"):
result = c.compress(messages, current_tokens=90_000)
assert len(result) < len(messages)
def test_prune_with_token_budget(self, budget_compressor):
"""_prune_old_tool_results with protect_tail_tokens respects the budget."""
c = budget_compressor
messages = [
{"role": "user", "content": "start"},
{"role": "assistant", "content": None,
"tool_calls": [{"function": {"name": "read_file", "arguments": '{"path": "big.txt"}'}}]},
{"role": "tool", "content": "x" * 10000, "tool_call_id": "c1"},
{"role": "assistant", "content": None,
"tool_calls": [{"function": {"name": "read_file", "arguments": '{"path": "small.txt"}'}}]},
{"role": "tool", "content": "y" * 10000, "tool_call_id": "c2"},
{"role": "user", "content": "short recent message"},
{"role": "assistant", "content": "short reply"},
]
result, pruned = c._prune_old_tool_results(
messages, protect_tail_count=2, protect_tail_tokens=1000,
)
assert pruned >= 1
def test_prune_short_conv_protects_entire_tail(self, budget_compressor):
"""Regression guard for PR #17025.
When ``len(messages) <= protect_tail_count`` and a token budget is
also set, every message must be protected. The previous code used
``min(protect_tail_count, len(result) - 1)`` which capped the floor
one below the full length, leaving the oldest message eligible for
pruning.
"""
c = budget_compressor
messages = [
{"role": "tool", "content": "x" * 5000, "tool_call_id": "c0"},
{"role": "assistant", "content": "ack"},
{"role": "user", "content": "recent"},
{"role": "assistant", "content": "reply"},
]
result, pruned = c._prune_old_tool_results(
messages,
protect_tail_count=4,
protect_tail_tokens=1_000_000,
)
assert pruned == 0
assert result[0]["content"] == "x" * 5000
def test_prune_without_token_budget_uses_message_count(self, budget_compressor):
"""Without protect_tail_tokens, falls back to message-count behavior."""
c = budget_compressor
messages = [
{"role": "user", "content": "start"},
{"role": "assistant", "content": None,
"tool_calls": [{"function": {"name": "tool", "arguments": "{}"}}]},
{"role": "tool", "content": "x" * 5000, "tool_call_id": "c1"},
{"role": "user", "content": "recent"},
{"role": "assistant", "content": "reply"},
]
result, pruned = c._prune_old_tool_results(
messages, protect_tail_count=3,
)
assert isinstance(pruned, int)
def test_multimodal_message_accumulates_text_chars_not_block_count(self, budget_compressor):
"""_find_tail_cut_by_tokens must use text char count, not list length,
for multimodal content. Regression guard for #16087.
Setup: 6 messages, budget=80 (soft_ceiling=120). The multimodal message
at index 1 has 500 chars of text → 135 tokens (correct) or 10 tokens (bug).
Fixed path: walk stops at the multimodal (44+135=179 > 120), cut stays at 2,
tail = messages[2:] = 4 messages.
Bug path: walk counts only 10 tokens for the multimodal, exhausts to head_end,
the head_end safeguard forces cut = n - min_tail = 3, tail = only 3 messages.
"""
c = budget_compressor
big_text = "x" * 500
multimodal_content = [
{"type": "text", "text": big_text},
{"type": "image_url", "image_url": {"url": "https://example.com/img.jpg"}},
]
messages = [
{"role": "user", "content": "head1"},
{"role": "user", "content": multimodal_content},
{"role": "assistant", "content": "tail1"},
{"role": "user", "content": "tail2"},
{"role": "assistant", "content": "tail3"},
{"role": "user", "content": "tail4"},
]
c.tail_token_budget = 80
head_end = 0
cut = c._find_tail_cut_by_tokens(messages, head_end)
assert len(messages) - cut >= 4, (
f"Expected ≥4 messages in tail (got {len(messages) - cut}, cut={cut}). "
"The multimodal message was underestimated — len(list) used instead of text chars."
)
def test_plain_string_content_unchanged(self, budget_compressor):
"""Plain string content must still be estimated correctly after the fix."""
c = budget_compressor
big_plain = "x" * 500
messages = [
{"role": "user", "content": "head1"},
{"role": "user", "content": big_plain},
{"role": "assistant", "content": "tail1"},
{"role": "user", "content": "tail2"},
{"role": "assistant", "content": "tail3"},
{"role": "user", "content": "tail4"},
]
c.tail_token_budget = 80
head_end = 0
cut = c._find_tail_cut_by_tokens(messages, head_end)
assert len(messages) - cut >= 4, (
f"Plain string regression: expected ≥4 messages in tail, got {len(messages) - cut}"
)
def test_image_only_block_contributes_zero_text_chars(self, budget_compressor):
"""Image-only content blocks (no 'text' key) contribute 0 chars + base overhead."""
c = budget_compressor
c.tail_token_budget = 500
image_only = [{"type": "image_url", "image_url": {"url": "https://example.com/x.jpg"}}]
messages = [
{"role": "user", "content": "a" * 4000},
{"role": "user", "content": image_only},
{"role": "assistant", "content": "ok"},
]
head_end = 0
cut = c._find_tail_cut_by_tokens(messages, head_end)
assert isinstance(cut, int)
assert 0 <= cut <= len(messages)
def test_mixed_list_with_bare_strings_does_not_crash(self, budget_compressor):
"""Content list may contain bare strings (not dicts) — must not raise AttributeError."""
c = budget_compressor
c.tail_token_budget = 500
mixed_content = ["Hello, world!", {"type": "text", "text": "extra text"}]
messages = [
{"role": "user", "content": mixed_content},
{"role": "assistant", "content": "ok"},
]
head_end = 0
cut = c._find_tail_cut_by_tokens(messages, head_end)
assert isinstance(cut, int)
assert 0 <= cut <= len(messages)
def test_generous_budget_protects_everything_floor_does_not_override(
self, budget_compressor
):
"""A budget that covers the whole transcript must prune nothing —
``protect_tail_count`` is a minimum floor, not a ceiling."""
c = budget_compressor
messages = []
for i in range(50):
messages.append({
"role": "assistant", "content": None,
"tool_calls": [{
"id": f"c{i}",
"type": "function",
"function": {"name": "noop", "arguments": "{}"},
}],
})
messages.append({
"role": "tool",
"tool_call_id": f"c{i}",
"content": f"unique-tool-output-{i:03d}-" + ("x" * 250),
})
_, pruned = c._prune_old_tool_results(
messages,
protect_tail_count=20,
protect_tail_tokens=10_000_000,
)
assert pruned == 0, (
"budget said protect everything, but the floor still pruned "
f"{pruned} messages — protect_tail_count is acting as a ceiling, "
"not a minimum floor"
)
class TestUpdateModelBudgets:
"""Regression: update_model() must recalculate token budgets."""
def test_tail_budget_recalculated(self):
"""tail_token_budget must change after switching to a different context length."""
from unittest.mock import patch
with patch("agent.context_compressor.get_model_context_length", return_value=200_000):
comp = ContextCompressor("model-a", threshold_percent=0.50, quiet_mode=True)
old_tail = comp.tail_token_budget
old_max_summary = comp.max_summary_tokens
comp.update_model("model-b", context_length=32_000)
assert comp.tail_token_budget != old_tail, "tail_token_budget should change"
assert comp.tail_token_budget < old_tail, "smaller context → smaller budget"
assert comp.max_summary_tokens != old_max_summary, "max_summary_tokens should change"
def test_budgets_proportional(self):
"""Budgets should be proportional to context_length after update."""
from unittest.mock import patch
with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
comp = ContextCompressor("model-a", threshold_percent=0.50, quiet_mode=True)
comp.update_model("model-b", context_length=10_000)
assert comp.tail_token_budget == int(comp.threshold_tokens * comp.summary_target_ratio)
assert comp.max_summary_tokens == min(int(10_000 * 0.05), 4000)
class TestTruncateToolCallArgsJson:
"""Regression tests for #11762.
The previous implementation produced invalid JSON by slicing
``function.arguments`` mid-string, which caused non-retryable 400s from
strict providers (observed on MiniMax) and stuck long sessions in a
re-send loop. The helper here must always emit parseable JSON whose
shape matches the original — shrunken, not corrupted.
"""
def _helper(self):
from agent.context_compressor import _truncate_tool_call_args_json
return _truncate_tool_call_args_json
def test_shrunken_args_remain_valid_json(self):
import json as _json
shrink = self._helper()
original = _json.dumps({
"path": "~/.hermes/skills/shopping/browser-setup-notes.md",
"content": "# Shopping Browser Setup Notes\n\n" + "abc " * 400,
})
assert len(original) > 500
shrunk = shrink(original)
parsed = _json.loads(shrunk)
assert parsed["path"] == "~/.hermes/skills/shopping/browser-setup-notes.md"
assert parsed["content"].endswith("...[truncated]")
assert len(shrunk) < len(original)
def test_non_json_arguments_pass_through(self):
shrink = self._helper()
not_json = "this is not json at all, " * 50
assert shrink(not_json) == not_json
def test_short_string_leaves_unchanged(self):
import json as _json
shrink = self._helper()
payload = _json.dumps({"command": "ls -la", "cwd": "/tmp"})
assert _json.loads(shrink(payload)) == {"command": "ls -la", "cwd": "/tmp"}
def test_nested_structures_are_walked(self):
import json as _json
shrink = self._helper()
payload = _json.dumps({
"messages": [
{"role": "user", "content": "x" * 500},
{"role": "assistant", "content": "ok"},
],
"meta": {"note": "y" * 500},
})
parsed = _json.loads(shrink(payload))
assert parsed["messages"][0]["content"].endswith("...[truncated]")
assert parsed["messages"][1]["content"] == "ok"
assert parsed["meta"]["note"].endswith("...[truncated]")
def test_non_string_leaves_preserved(self):
import json as _json
shrink = self._helper()
payload = _json.dumps({
"retries": 3,
"enabled": True,
"timeout": None,
"items": [1, 2, 3],
"note": "z" * 500,
})
parsed = _json.loads(shrink(payload))
assert parsed["retries"] == 3
assert parsed["enabled"] is True
assert parsed["timeout"] is None
assert parsed["items"] == [1, 2, 3]
assert parsed["note"].endswith("...[truncated]")
def test_scalar_json_string_gets_shrunk(self):
import json as _json
shrink = self._helper()
payload = _json.dumps("q" * 500)
parsed = _json.loads(shrink(payload))
assert isinstance(parsed, str)
assert parsed.endswith("...[truncated]")
def test_unicode_preserved(self):
import json as _json
shrink = self._helper()
payload = _json.dumps({"content": "非德满" + ("a" * 500)})
out = shrink(payload)
assert "非德满" in out
def test_pass3_emits_valid_json_for_downstream_provider(self):
"""End-to-end: Pass 3 must never produce the exact failure payload
that caused the 400 loop (unterminated string, missing brace)."""
import json as _json
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
c = ContextCompressor(
model="test/model",
threshold_percent=0.85,
protect_first_n=1,
protect_last_n=1,
quiet_mode=True,
)
huge_content = "# Shopping Browser Setup Notes\n\n## Overview\n" + "x " * 400
args_payload = _json.dumps({
"path": "~/.hermes/skills/shopping/browser-setup-notes.md",
"content": huge_content,
})
assert len(args_payload) > 500
messages = [
{"role": "user", "content": "please write two files"},
{"role": "assistant", "content": None, "tool_calls": [
{"id": "call_1", "type": "function",
"function": {"name": "write_file", "arguments": args_payload}},
]},
{"role": "tool", "tool_call_id": "call_1",
"content": '{"bytes_written": 727}'},
{"role": "user", "content": "ok"},
{"role": "assistant", "content": "done"},
]
result, _ = c._prune_old_tool_results(messages, protect_tail_count=2)
shrunk = result[1]["tool_calls"][0]["function"]["arguments"]
parsed = _json.loads(shrunk)
assert parsed["path"] == "~/.hermes/skills/shopping/browser-setup-notes.md"
assert parsed["content"].endswith("...[truncated]")