"""Session adapter for codex app-server runtime.
Owns one Codex thread per Hermes session. Drives `turn/start`, consumes
streaming notifications via CodexEventProjector, handles server-initiated
approval requests (apply_patch, exec command), translates cancellation,
and returns a clean turn result that AIAgent.run_conversation() can splice
into its `messages` list.
Lifecycle:
session = CodexAppServerSession(cwd="/home/x/proj")
session.ensure_started() # spawns + handshake + thread/start
result = session.run_turn(user_input="hello") # blocks until turn/completed
# result.final_text → assistant text returned to caller
# result.projected_messages → list of {role, content, ...} for messages list
# result.tool_iterations → how many tool-shaped items completed (skill nudge counter)
# result.interrupted → True if Ctrl+C / interrupt_requested fired mid-turn
session.close() # tears down subprocess
Threading model: the adapter is single-threaded from the caller's perspective.
The underlying CodexAppServerClient owns its own reader threads but exposes
blocking-with-timeout queues that this adapter polls in a loop, so the run_turn
call is synchronous and behaves like AIAgent's existing chat_completions loop.
"""
from __future__ import annotations
import logging
import os
import threading
import time
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
from agent.redact import redact_sensitive_text
from agent.transports.codex_app_server import (
CodexAppServerClient,
CodexAppServerError,
)
from agent.transports.codex_event_projector import CodexEventProjector
logger = logging.getLogger(__name__)
_STDERR_TAIL_LINES = 12
_HERMES_TO_CODEX_PERMISSION_PROFILE = {
"auto": "workspace-write",
"approval-required": "read-only-with-approval",
"unrestricted": "full-access",
"yolo": "full-access",
}
@dataclass
class TurnResult:
"""Result of one user→assistant→tool turn through the codex app-server."""
final_text: str = ""
projected_messages: list[dict] = field(default_factory=list)
tool_iterations: int = 0
interrupted: bool = False
error: Optional[str] = None
turn_id: Optional[str] = None
thread_id: Optional[str] = None
should_retire: bool = False
_TURN_ABORTED_MARKERS = ("<turn_aborted>", "<turn_aborted/>")
_OAUTH_REFRESH_FAILURE_HINTS = (
"invalid_grant",
"invalid grant",
"refresh token",
"refresh_token",
"token refresh",
"token_refresh",
"token has expired",
"expired_token",
"expired token",
"not authenticated",
"unauthenticated",
"unauthorized",
"401 unauthorized",
"re-authenticate",
"reauthenticate",
"please log in",
"please login",
"auth profile",
"no auth profile",
"oauth",
)
def _classify_oauth_failure(*parts: str) -> Optional[str]:
"""Return a user-friendly re-auth hint if any of the provided strings
look like a codex OAuth/token-refresh failure; otherwise None.
Used for both `turn/start` JSON-RPC errors and post-mortem stderr
inspection when the subprocess exits unexpectedly. Conservative on
purpose — we only redirect users to `codex login` when the signal
is strong, so unrelated runtime failures still surface verbatim.
"""
haystack = " ".join(p for p in parts if p).lower()
if not haystack:
return None
for needle in _OAUTH_REFRESH_FAILURE_HINTS:
if needle in haystack:
return (
"Codex authentication failed — your ChatGPT/Codex login "
"looks expired or invalid. Run `codex login` to refresh, "
"then retry. (Fall back to default runtime with "
"`/codex-runtime auto` if the issue persists.)"
)
return None
@dataclass
class _ServerRequestRouting:
"""Default policies for codex-side approval requests when no interactive
callback is wired in. These are only used by tests + cron / non-interactive
contexts; the live CLI path passes an approval_callback that defers to
tools.approval.prompt_dangerous_approval()."""
auto_approve_exec: bool = False
auto_approve_apply_patch: bool = False
class CodexAppServerSession:
"""One Codex thread per Hermes session, lifetime owned by AIAgent.
Not thread-safe — one caller drives it at a time, matching how AIAgent's
run_conversation() loop is structured today. The codex client itself can
handle interleaved reads/writes via its own threads, but the adapter's
state (projector, thread_id, turn counter) is owned by the caller thread.
"""
def __init__(
self,
*,
cwd: Optional[str] = None,
codex_bin: str = "codex",
codex_home: Optional[str] = None,
permission_profile: Optional[str] = None,
approval_callback: Optional[Callable[..., str]] = None,
on_event: Optional[Callable[[dict], None]] = None,
request_routing: Optional[_ServerRequestRouting] = None,
client_factory: Optional[Callable[..., CodexAppServerClient]] = None,
) -> None:
self._cwd = cwd or os.getcwd()
self._codex_bin = codex_bin
self._codex_home = codex_home
self._permission_profile = (
permission_profile or _HERMES_TO_CODEX_PERMISSION_PROFILE.get(
os.environ.get("HERMES_TERMINAL_SECURITY_MODE", "auto"),
"workspace-write",
)
)
self._approval_callback = approval_callback
self._on_event = on_event
self._routing = request_routing or _ServerRequestRouting()
self._client_factory = client_factory or CodexAppServerClient
self._client: Optional[CodexAppServerClient] = None
self._thread_id: Optional[str] = None
self._interrupt_event = threading.Event()
self._pending_file_changes: dict[str, str] = {}
self._closed = False
def ensure_started(self) -> str:
"""Spawn the subprocess, do the initialize handshake, and start a
thread. Returns the codex thread id. Idempotent — repeated calls
return the same thread id."""
if self._thread_id is not None:
return self._thread_id
if self._client is None:
self._client = self._client_factory(
codex_bin=self._codex_bin, codex_home=self._codex_home
)
self._client.initialize(
client_name="hermes",
client_title="Hermes Agent",
client_version=_get_hermes_version(),
)
params: dict[str, Any] = {"cwd": self._cwd}
result = self._client.request("thread/start", params, timeout=15)
thread_obj = result.get("thread") or {}
thread_id = (
thread_obj.get("id")
or thread_obj.get("sessionId")
or result.get("sessionId")
or result.get("threadId")
)
if not thread_id:
raise CodexAppServerError(
code=-32603,
message=(
"codex thread/start returned no thread id "
f"(payload keys: {sorted(result.keys())})"
),
)
self._thread_id = thread_id
logger.info(
"codex app-server thread started: id=%s profile=%s cwd=%s",
self._thread_id[:8],
self._permission_profile,
self._cwd,
)
return self._thread_id
def close(self) -> None:
if self._closed:
return
self._closed = True
if self._client is not None:
try:
self._client.close()
except Exception:
pass
self._client = None
self._thread_id = None
def __enter__(self) -> "CodexAppServerSession":
return self
def __exit__(self, *exc: Any) -> None:
self.close()
def request_interrupt(self) -> None:
"""Idempotent: signal the active turn loop to issue turn/interrupt
and unwind. Called by AIAgent's _interrupt_requested path."""
self._interrupt_event.set()
def _format_error_with_stderr(
self,
prefix: str,
exc: Any = "",
*,
tail_lines: int = _STDERR_TAIL_LINES,
) -> str:
"""Build a user-facing error string for codex failures.
Appends the last few lines of codex's stderr buffer when available,
passed through agent.redact with force=True so secrets in provider
error responses (auth headers, query-string tokens, sk-* keys) never
leak into chat output or trajectories. The codex CLI's own error
text ('Internal error', 'turn/start failed: ...') is otherwise
opaque and forces users to re-run with verbose flags to diagnose
config / provider / auth-bridge problems.
Use this for the generic / catch-all branches. Specific
classifications (OAuth via _classify_oauth_failure, post-tool wedge
watchdog) already produce a clean hint and should be used instead.
"""
exc_str = str(exc) if exc != "" and exc is not None else ""
base = f"{prefix}: {exc_str}" if exc_str else prefix
if self._client is None:
return base
try:
tail = self._client.stderr_tail(tail_lines)
except Exception:
return base
if not tail:
return base
joined = "\n".join(line.rstrip() for line in tail if line)
if not joined.strip():
return base
redacted = redact_sensitive_text(joined, force=True)
return f"{base}\ncodex stderr (last {len(tail)} lines):\n{redacted}"
def run_turn(
self,
user_input: str,
*,
turn_timeout: float = 600.0,
notification_poll_timeout: float = 0.25,
post_tool_quiet_timeout: float = 90.0,
) -> TurnResult:
"""Send a user message and block until turn/completed, while
forwarding server-initiated approval requests and projecting items
into Hermes' messages shape.
post_tool_quiet_timeout: if codex emits a tool completion and then
goes quiet for this many seconds without emitting another item or
`turn/completed`, fast-fail and mark the session for retirement.
Mirrors openclaw beta.8's post-tool completion watchdog (#81697)
so a wedged codex doesn't burn the full turn deadline.
"""
result = TurnResult()
try:
self.ensure_started()
except (CodexAppServerError, TimeoutError) as exc:
result.error = self._format_error_with_stderr(
"codex app-server startup failed", exc
)
result.should_retire = True
return result
assert self._client is not None and self._thread_id is not None
result.thread_id = self._thread_id
self._interrupt_event.clear()
projector = CodexEventProjector()
try:
ts = self._client.request(
"turn/start",
{
"threadId": self._thread_id,
"input": [{"type": "text", "text": user_input}],
},
timeout=10,
)
except CodexAppServerError as exc:
stderr_blob = "\n".join(self._client.stderr_tail(40))
hint = _classify_oauth_failure(exc.message, stderr_blob)
if hint is not None:
result.error = hint
result.should_retire = True
else:
result.error = self._format_error_with_stderr(
"turn/start failed", exc
)
return result
except TimeoutError as exc:
stderr_blob = "\n".join(self._client.stderr_tail(40))
hint = _classify_oauth_failure(stderr_blob)
result.error = hint or self._format_error_with_stderr(
"turn/start timed out", exc
)
result.should_retire = True
return result
result.turn_id = (ts.get("turn") or {}).get("id")
deadline = time.monotonic() + turn_timeout
turn_complete = False
last_tool_completion_at: Optional[float] = None
while time.monotonic() < deadline and not turn_complete:
if self._interrupt_event.is_set():
self._issue_interrupt(result.turn_id)
result.interrupted = True
break
if not self._client.is_alive():
stderr_blob = "\n".join(self._client.stderr_tail(60))
hint = _classify_oauth_failure(stderr_blob)
if hint is not None:
result.error = hint
else:
result.error = self._format_error_with_stderr(
"codex app-server subprocess exited unexpectedly",
tail_lines=20,
)
result.should_retire = True
break
if (
last_tool_completion_at is not None
and (time.monotonic() - last_tool_completion_at)
> post_tool_quiet_timeout
):
self._issue_interrupt(result.turn_id)
result.interrupted = True
result.error = (
f"codex went silent for "
f"{post_tool_quiet_timeout:.0f}s after a tool result; "
f"retiring app-server session."
)
result.should_retire = True
break
sreq = self._client.take_server_request(timeout=0)
if sreq is not None:
for _ in range(8):
pending = self._client.take_notification(timeout=0)
if pending is None:
break
self._track_pending_file_change(pending)
proj = projector.project(pending)
if proj.messages:
result.projected_messages.extend(proj.messages)
if proj.is_tool_iteration:
result.tool_iterations += 1
last_tool_completion_at = time.monotonic()
if proj.final_text is not None:
result.final_text = proj.final_text
if _has_turn_aborted_marker(proj.final_text):
turn_complete = True
result.interrupted = True
result.error = (
result.error
or "codex reported turn_aborted"
)
self._handle_server_request(sreq)
last_tool_completion_at = None
continue
note = self._client.take_notification(
timeout=notification_poll_timeout
)
if note is None:
continue
method = note.get("method", "")
if self._on_event is not None:
try:
self._on_event(note)
except Exception:
logger.debug("on_event callback raised", exc_info=True)
self._track_pending_file_change(note)
projection = projector.project(note)
if projection.messages:
result.projected_messages.extend(projection.messages)
if projection.is_tool_iteration:
result.tool_iterations += 1
last_tool_completion_at = time.monotonic()
else:
if projection.messages or projection.final_text is not None:
last_tool_completion_at = None
if projection.final_text is not None:
result.final_text = projection.final_text
if _has_turn_aborted_marker(projection.final_text):
turn_complete = True
result.interrupted = True
result.error = (
result.error or "codex reported turn_aborted"
)
if method == "turn/completed":
turn_complete = True
turn_status = (
(note.get("params") or {}).get("turn") or {}
).get("status")
if turn_status and turn_status not in {"completed", "interrupted"}:
err_obj = (
(note.get("params") or {}).get("turn") or {}
).get("error")
if err_obj:
err_msg = err_obj.get("message") or str(err_obj)
stderr_blob = "\n".join(
self._client.stderr_tail(40)
)
hint = _classify_oauth_failure(err_msg, stderr_blob)
if hint is not None:
result.error = hint
result.should_retire = True
else:
result.error = self._format_error_with_stderr(
f"turn ended status={turn_status}", err_msg
)
if not turn_complete and not result.interrupted:
self._issue_interrupt(result.turn_id)
result.interrupted = True
if not result.error:
result.error = self._format_error_with_stderr(
f"turn timed out after {turn_timeout}s"
)
result.should_retire = True
return result
def _issue_interrupt(self, turn_id: Optional[str]) -> None:
if self._client is None or self._thread_id is None or turn_id is None:
return
try:
self._client.request(
"turn/interrupt",
{"threadId": self._thread_id, "turnId": turn_id},
timeout=5,
)
except CodexAppServerError as exc:
logger.debug("turn/interrupt non-fatal: %s", exc)
except TimeoutError:
logger.warning("turn/interrupt timed out")
def _handle_server_request(self, req: dict) -> None:
"""Translate a codex server request (approval) into Hermes' approval
flow, then send the response.
Method names verified live against codex 0.130.0 (Apr 2026):
item/commandExecution/requestApproval — exec approvals
item/fileChange/requestApproval — apply_patch approvals
item/permissions/requestApproval — permissions changes
(we decline; user controls
permission profile in
~/.codex/config.toml).
"""
if self._client is None:
return
method = req.get("method", "")
rid = req.get("id")
params = req.get("params") or {}
if method == "item/commandExecution/requestApproval":
decision = self._decide_exec_approval(params)
self._client.respond(rid, {"decision": decision})
elif method == "item/fileChange/requestApproval":
decision = self._decide_apply_patch_approval(params)
self._client.respond(rid, {"decision": decision})
elif method == "item/permissions/requestApproval":
self._client.respond(rid, {"decision": "decline"})
elif method == "mcpServer/elicitation/request":
server_name = params.get("serverName") or ""
if server_name == "hermes-tools":
self._client.respond(
rid,
{"action": "accept", "content": None, "_meta": None},
)
else:
self._client.respond(
rid,
{"action": "decline", "content": None, "_meta": None},
)
else:
logger.warning("Unknown codex server request: %s", method)
self._client.respond_error(
rid, code=-32601, message=f"Unsupported method: {method}"
)
def _decide_exec_approval(self, params: dict) -> str:
if self._routing.auto_approve_exec:
return "accept"
command = params.get("command") or ""
cwd = params.get("cwd") or self._cwd or "<unknown>"
reason = params.get("reason")
description = f"Codex requests exec in {cwd}"
if reason:
description += f" — {reason}"
if self._approval_callback is not None:
try:
choice = self._approval_callback(
command, description, allow_permanent=False
)
return _approval_choice_to_codex_decision(choice)
except Exception:
logger.exception("approval_callback raised on exec request")
return "decline"
return "decline"
def _decide_apply_patch_approval(self, params: dict) -> str:
if self._routing.auto_approve_apply_patch:
return "accept"
if self._approval_callback is not None:
reason = params.get("reason")
grant_root = params.get("grantRoot")
item_id = params.get("itemId") or ""
change_summary = self._lookup_pending_file_change(item_id)
description_parts = []
if reason:
description_parts.append(reason)
if change_summary:
description_parts.append(change_summary)
if grant_root:
description_parts.append(f"grants write to {grant_root}")
description = (
"; ".join(description_parts)
if description_parts
else "Codex requests to apply a patch"
)
command_label = (
f"apply_patch: {change_summary}" if change_summary
else f"apply_patch: {reason}" if reason
else "apply_patch"
)
try:
choice = self._approval_callback(
command_label,
description,
allow_permanent=False,
)
return _approval_choice_to_codex_decision(choice)
except Exception:
logger.exception("approval_callback raised on apply_patch")
return "decline"
return "decline"
def _track_pending_file_change(self, note: dict) -> None:
"""Maintain self._pending_file_changes from item/started + item/completed
notifications. Lets the apply_patch approval prompt show what's
actually changing — codex's approval params don't carry the data."""
method = note.get("method", "")
params = note.get("params") or {}
item = params.get("item") or {}
if item.get("type") != "fileChange":
return
item_id = item.get("id") or ""
if not item_id:
return
if method == "item/started":
changes = item.get("changes") or []
if not changes:
self._pending_file_changes[item_id] = "1 change pending"
return
kinds: dict[str, int] = {}
paths: list[str] = []
for ch in changes:
if not isinstance(ch, dict):
continue
kind = (ch.get("kind") or {}).get("type") or "update"
kinds[kind] = kinds.get(kind, 0) + 1
p = ch.get("path") or ""
if p:
paths.append(p)
counts = ", ".join(f"{n} {k}" for k, n in sorted(kinds.items()))
preview = ", ".join(paths[:3])
if len(paths) > 3:
preview += f", +{len(paths) - 3} more"
self._pending_file_changes[item_id] = (
f"{counts}: {preview}" if preview else counts
)
elif method == "item/completed":
self._pending_file_changes.pop(item_id, None)
def _lookup_pending_file_change(self, item_id: str) -> Optional[str]:
"""Look up an in-progress fileChange item by id and summarize its
changes for the approval prompt. Returns None when we don't have
the item cached (e.g. approval arrived before item/started, or
fileChange item content not tracked yet)."""
if not item_id:
return None
cached = self._pending_file_changes.get(item_id)
if not cached:
return None
return cached
def _approval_choice_to_codex_decision(choice: str) -> str:
"""Map Hermes approval choices onto codex's CommandExecutionApprovalDecision
/ FileChangeApprovalDecision wire values.
Hermes returns 'once', 'session', 'always', or 'deny'.
Codex expects 'accept', 'acceptForSession', 'decline', or 'cancel'
(verified against codex-rs/app-server-protocol/src/protocol/v2/item.rs
on codex 0.130.0).
"""
if choice in {"once",}:
return "accept"
if choice in {"session", "always"}:
return "acceptForSession"
return "decline"
def _has_turn_aborted_marker(text: str) -> bool:
"""Return True if `text` contains any of the raw markers codex uses
to signal a turn was aborted without emitting `turn/completed`.
Codex emits `<turn_aborted>` (and sometimes `<turn_aborted/>`) as raw
text inside agentMessage items when an interrupt or upstream error
tears the turn down before the normal completion path fires. Mirrors
openclaw beta.8's terminal-marker fix so we don't burn the full turn
deadline waiting for a turn/completed that never comes.
"""
if not text:
return False
for marker in _TURN_ABORTED_MARKERS:
if marker in text:
return True
return False
def _get_hermes_version() -> str:
"""Best-effort Hermes version string for codex's userAgent line."""
try:
from importlib.metadata import version
return version("hermes-agent")
except Exception:
return "0.0.0"