"""Integration test for the codex_app_server runtime path through AIAgent.
Verifies that:
- api_mode='codex_app_server' is accepted on AIAgent construction
- run_conversation() takes the early-return path and never enters the
chat completions loop
- Projected messages from a fake Codex session land in the messages list
- tool_iterations from the codex session tick the skill nudge counter
- Memory nudge counter ticks once per turn
- The returned dict has the same shape as the chat_completions path
"""
from __future__ import annotations
from unittest.mock import patch
import pytest
import run_agent
from agent.transports.codex_app_server_session import CodexAppServerSession, TurnResult
@pytest.fixture
def fake_session(monkeypatch):
"""Replace CodexAppServerSession with a stub that returns a fixed
TurnResult, so we can drive AIAgent without spawning real codex."""
def fake_run_turn(self, user_input: str, **kwargs):
return TurnResult(
final_text=f"echo: {user_input}",
projected_messages=[
{"role": "assistant", "content": None,
"tool_calls": [{"id": "exec_1", "type": "function",
"function": {"name": "exec_command",
"arguments": "{}"}}]},
{"role": "tool", "tool_call_id": "exec_1", "content": "ok"},
{"role": "assistant", "content": f"echo: {user_input}"},
],
tool_iterations=1,
interrupted=False,
error=None,
turn_id="turn-stub-1",
thread_id="thread-stub-1",
)
monkeypatch.setattr(CodexAppServerSession, "run_turn", fake_run_turn)
monkeypatch.setattr(
CodexAppServerSession, "ensure_started", lambda self: "thread-stub-1"
)
def _make_codex_agent():
"""Construct an AIAgent in codex_app_server mode without contacting any
real provider. We pass api_mode explicitly so the constructor takes the
fast path for direct credentials."""
return run_agent.AIAgent(
api_key="stub",
base_url="https://stub.invalid",
provider="openai",
api_mode="codex_app_server",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
class TestApiModeAccepted:
def test_api_mode_is_codex_app_server(self):
agent = _make_codex_agent()
assert agent.api_mode == "codex_app_server"
class TestRunConversationCodexPath:
def test_run_conversation_returns_codex_shape(self, fake_session):
agent = _make_codex_agent()
with patch.object(agent, "_spawn_background_review", return_value=None):
result = agent.run_conversation("hello there")
assert result["final_response"] == "echo: hello there"
assert result["completed"] is True
assert result["partial"] is False
assert result["error"] is None
assert result["api_calls"] == 1
assert result["codex_thread_id"] == "thread-stub-1"
assert result["codex_turn_id"] == "turn-stub-1"
def test_projected_messages_are_spliced(self, fake_session):
agent = _make_codex_agent()
with patch.object(agent, "_spawn_background_review", return_value=None):
result = agent.run_conversation("hello")
msgs = result["messages"]
assert len(msgs) >= 4
assert msgs[0]["role"] == "user"
assert msgs[0]["content"] == "hello"
final = [m for m in msgs if m.get("role") == "assistant"
and m.get("content") == "echo: hello"]
assert final, f"expected final assistant message in {msgs}"
def test_nudge_counters_tick(self, fake_session):
"""The skill nudge counter must accumulate tool_iterations across
turns. The memory nudge counter is gated on memory being configured
(which we skip via skip_memory=True), so we don't assert on it here —
a separate test below covers that path explicitly."""
agent = _make_codex_agent()
agent._iters_since_skill = 0
agent._user_turn_count = 0
with patch.object(agent, "_spawn_background_review", return_value=None):
agent.run_conversation("first")
assert agent._iters_since_skill == 1
assert agent._user_turn_count == 1
with patch.object(agent, "_spawn_background_review", return_value=None):
agent.run_conversation("second")
assert agent._iters_since_skill == 2
assert agent._user_turn_count == 2
def test_user_message_not_duplicated(self, fake_session):
"""Regression guard: the user message must appear exactly once in
the messages list. The standard run_conversation pre-loop appends
it, and the codex helper must NOT append again."""
agent = _make_codex_agent()
with patch.object(agent, "_spawn_background_review", return_value=None):
result = agent.run_conversation("ping unique 12345")
user_count = sum(
1 for m in result["messages"]
if m.get("role") == "user" and m.get("content") == "ping unique 12345"
)
assert user_count == 1, f"user message appeared {user_count}× in {result['messages']}"
def test_background_review_NOT_invoked_below_threshold(self, fake_session):
"""A single turn shouldn't trigger background review — counters
haven't reached the nudge interval (default 10)."""
agent = _make_codex_agent()
agent._memory_nudge_interval = 10
agent._skill_nudge_interval = 10
agent._iters_since_skill = 0
with patch.object(agent, "_spawn_background_review",
return_value=None) as spawn:
agent.run_conversation("ping")
assert not spawn.called
def test_background_review_skill_trigger_fires_above_threshold(
self, monkeypatch
):
"""When tool iterations cross the skill nudge interval, the
background review fires with review_skills=True and the right
messages_snapshot signature."""
from agent.transports.codex_app_server_session import (
CodexAppServerSession, TurnResult,
)
def fake_run_turn(self, user_input: str, **kwargs):
return TurnResult(
final_text=f"echo: {user_input}",
projected_messages=[
{"role": "assistant", "content": f"echo: {user_input}"},
],
tool_iterations=10,
turn_id="t1", thread_id="th1",
)
monkeypatch.setattr(CodexAppServerSession, "run_turn", fake_run_turn)
monkeypatch.setattr(
CodexAppServerSession, "ensure_started", lambda self: "th1"
)
agent = _make_codex_agent()
agent._skill_nudge_interval = 10
agent._iters_since_skill = 0
agent.valid_tool_names = set(getattr(agent, "valid_tool_names", set()))
agent.valid_tool_names.add("skill_manage")
with patch.object(agent, "_spawn_background_review",
return_value=None) as spawn:
agent.run_conversation("do tool work")
assert spawn.called, "skill threshold tripped but review didn't fire"
call = spawn.call_args
assert "messages_snapshot" in call.kwargs
assert isinstance(call.kwargs["messages_snapshot"], list)
assert call.kwargs["review_skills"] is True
assert agent._iters_since_skill == 0
def test_background_review_signature_never_breaks(self, fake_session):
"""Even when no trigger fires, the helper must never call
_spawn_background_review with the wrong signature. Run a turn,
then run another turn after manually tripping the skill counter
and confirm the call shape is the kwargs-only form the function
actually accepts."""
agent = _make_codex_agent()
agent._skill_nudge_interval = 1
agent._iters_since_skill = 0
agent.valid_tool_names = set(getattr(agent, "valid_tool_names", set()))
agent.valid_tool_names.add("skill_manage")
with patch.object(agent, "_spawn_background_review",
return_value=None) as spawn:
agent.run_conversation("first")
assert spawn.called
call = spawn.call_args
assert call.args == (), (
f"expected no positional args, got {call.args!r} — "
"would crash _spawn_background_review at runtime"
)
assert "messages_snapshot" in call.kwargs
def test_chat_completions_loop_is_not_entered(self, fake_session):
"""The early-return must bypass the regular API call loop entirely.
We confirm by patching the SDK call and asserting it's never invoked."""
agent = _make_codex_agent()
with patch.object(agent, "client") as client_mock, patch.object(
agent, "_spawn_background_review", return_value=None
):
agent.run_conversation("hi")
assert not client_mock.chat.completions.create.called
class TestReviewForkApiModeDowngrade:
"""When the parent agent runs on codex_app_server, the background
review fork must downgrade to codex_responses — otherwise the fork
can't dispatch agent-loop tools (memory, skill_manage) which is the
whole point of the review."""
def test_codex_app_server_parent_downgrades_review_fork(self):
"""Live test against the real _spawn_background_review code path:
verify the review_agent gets api_mode=codex_responses when the
parent is codex_app_server."""
from unittest.mock import MagicMock, patch as _patch
agent = _make_codex_agent()
agent._memory_store = MagicMock()
agent._memory_enabled = True
agent._user_profile_enabled = True
agent._current_main_runtime = lambda: {
"api_mode": "codex_app_server",
"base_url": "https://chatgpt.com/backend-api/codex",
"api_key": "stub-token",
}
captured = {}
def _capture_init(self, **kwargs):
captured.update(kwargs)
self.api_mode = kwargs.get("api_mode")
self.provider = kwargs.get("provider")
self.model = kwargs.get("model")
self._memory_write_origin = None
self._memory_write_context = None
self._memory_store = None
self._memory_enabled = False
self._user_profile_enabled = False
self._memory_nudge_interval = 0
self._skill_nudge_interval = 0
self.suppress_status_output = False
self._session_messages = []
def _no_op_run_conv(*a, **kw):
return {"final_response": "", "messages": []}
self.run_conversation = _no_op_run_conv
def _no_op_close(*a, **kw):
return None
self.close = _no_op_close
with _patch("run_agent.AIAgent.__init__", _capture_init):
agent._spawn_background_review(
messages_snapshot=[{"role": "user", "content": "x"}],
review_memory=True,
review_skills=False,
)
import time
for _ in range(30):
if "api_mode" in captured:
break
time.sleep(0.1)
assert captured.get("api_mode") == "codex_responses", (
f"review fork should be downgraded to codex_responses when "
f"parent is codex_app_server; got {captured.get('api_mode')!r}"
)
class TestErrorHandling:
def test_session_exception_returns_partial_with_error(self, monkeypatch):
def boom_run_turn(self, user_input, **kwargs):
raise RuntimeError("subprocess died")
monkeypatch.setattr(CodexAppServerSession, "ensure_started",
lambda self: "t1")
monkeypatch.setattr(CodexAppServerSession, "run_turn", boom_run_turn)
agent = _make_codex_agent()
with patch.object(agent, "_spawn_background_review", return_value=None):
result = agent.run_conversation("hi")
assert result["completed"] is False
assert result["partial"] is True
assert "subprocess died" in result["error"]
assert "codex-runtime auto" in result["final_response"]
def test_interrupted_turn_marked_partial(self, monkeypatch):
def interrupted_turn(self, user_input, **kwargs):
return TurnResult(
final_text="",
projected_messages=[],
tool_iterations=0,
interrupted=True,
error="user interrupted",
turn_id="t",
thread_id="th",
)
monkeypatch.setattr(CodexAppServerSession, "ensure_started",
lambda self: "th")
monkeypatch.setattr(CodexAppServerSession, "run_turn", interrupted_turn)
agent = _make_codex_agent()
with patch.object(agent, "_spawn_background_review", return_value=None):
result = agent.run_conversation("hi")
assert result["completed"] is False
assert result["partial"] is True
assert result["error"] == "user interrupted"
class TestSessionRetirementOnRunAgent:
"""run_agent.py side: when run_turn returns should_retire=True, the
AIAgent must close + null _codex_session so the next turn respawns."""
def test_should_retire_drops_session(self, monkeypatch):
closes = {"count": 0}
def fake_run_turn(self, user_input, **kwargs):
return TurnResult(
final_text="",
projected_messages=[],
tool_iterations=0,
interrupted=True,
error="turn timed out after 600.0s",
turn_id="tu1",
thread_id="th1",
should_retire=True,
)
def fake_close(self):
closes["count"] += 1
monkeypatch.setattr(CodexAppServerSession, "ensure_started",
lambda self: "th1")
monkeypatch.setattr(CodexAppServerSession, "run_turn", fake_run_turn)
monkeypatch.setattr(CodexAppServerSession, "close", fake_close)
agent = _make_codex_agent()
with patch.object(agent, "_spawn_background_review", return_value=None):
result = agent.run_conversation("hi")
assert closes["count"] == 1
assert getattr(agent, "_codex_session", "MISSING") is None
assert result["partial"] is True
assert result["error"] == "turn timed out after 600.0s"
def test_normal_turn_keeps_session(self, fake_session):
"""fake_session fixture returns should_retire=False (default).
The session must stay attached for the next turn to reuse."""
agent = _make_codex_agent()
with patch.object(agent, "_spawn_background_review", return_value=None):
agent.run_conversation("hi")
assert getattr(agent, "_codex_session", None) is not None
def test_exception_path_also_drops_session(self, monkeypatch):
"""Even if run_turn raises (not just sets should_retire), we must
drop the session — a thrown exception is the strongest possible
signal the process is dead."""
closes = {"count": 0}
def boom_run_turn(self, user_input, **kwargs):
raise RuntimeError("codex segfaulted")
def fake_close(self):
closes["count"] += 1
monkeypatch.setattr(CodexAppServerSession, "ensure_started",
lambda self: "th1")
monkeypatch.setattr(CodexAppServerSession, "run_turn", boom_run_turn)
monkeypatch.setattr(CodexAppServerSession, "close", fake_close)
agent = _make_codex_agent()
with patch.object(agent, "_spawn_background_review", return_value=None):
result = agent.run_conversation("hi")
assert closes["count"] == 1
assert agent._codex_session is None
assert result["completed"] is False
assert "codex segfaulted" in result["error"]