"""
Process Registry -- In-memory registry for managed background processes.
Tracks processes spawned via terminal(background=true), providing:
- Output buffering (rolling 200KB window)
- Status polling and log retrieval
- Blocking wait with interrupt support
- Process killing
- Crash recovery via JSON checkpoint file
- Session-scoped tracking for gateway reset protection
Background processes execute THROUGH the environment interface -- nothing
runs on the host machine unless TERMINAL_ENV=local. For Docker, Singularity,
Modal, Daytona, and SSH backends, the command runs inside the sandbox.
Usage:
from tools.process_registry import process_registry
# Spawn a background process (called from terminal_tool)
session = process_registry.spawn(env, "pytest -v", task_id="task_123")
# Poll for status
result = process_registry.poll(session.id)
# Block until done
result = process_registry.wait(session.id, timeout=300)
# Kill it
process_registry.kill(session.id)
"""
import json
import logging
import os
import platform
import shlex
import signal
import subprocess
import threading
import time
import uuid
_IS_WINDOWS = platform.system() == "Windows"
from tools.environments.local import _find_shell, _resolve_safe_cwd, _sanitize_subprocess_env
from hermes_cli._subprocess_compat import windows_hide_flags
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from hermes_cli.config import get_hermes_home
logger = logging.getLogger(__name__)
CHECKPOINT_PATH = get_hermes_home() / "processes.json"
MAX_OUTPUT_CHARS = 200_000
FINISHED_TTL_SECONDS = 1800
MAX_PROCESSES = 64
WATCH_MIN_INTERVAL_SECONDS = 15
WATCH_STRIKE_LIMIT = 3
WATCH_GLOBAL_MAX_PER_WINDOW = 15
WATCH_GLOBAL_WINDOW_SECONDS = 10
WATCH_GLOBAL_COOLDOWN_SECONDS = 30
def format_uptime_short(seconds: int) -> str:
s = max(0, int(seconds))
if s < 60:
return f"{s}s"
mins, secs = divmod(s, 60)
if mins < 60:
return f"{mins}m {secs}s"
hours, mins = divmod(mins, 60)
return f"{hours}h {mins}m"
@dataclass
class ProcessSession:
"""A tracked background process with output buffering."""
id: str
command: str
task_id: str = ""
session_key: str = ""
pid: Optional[int] = None
process: Optional[subprocess.Popen] = None
env_ref: Any = None
cwd: Optional[str] = None
started_at: float = 0.0
exited: bool = False
exit_code: Optional[int] = None
output_buffer: str = ""
max_output_chars: int = MAX_OUTPUT_CHARS
detached: bool = False
pid_scope: str = "host"
watcher_platform: str = ""
watcher_chat_id: str = ""
watcher_user_id: str = ""
watcher_user_name: str = ""
watcher_thread_id: str = ""
watcher_message_id: str = ""
watcher_interval: int = 0
notify_on_complete: bool = False
watch_patterns: List[str] = field(default_factory=list)
_watch_hits: int = field(default=0, repr=False)
_watch_suppressed: int = field(default=0, repr=False)
_watch_disabled: bool = field(default=False, repr=False)
_watch_last_emit_at: float = field(default=0.0, repr=False)
_watch_cooldown_until: float = field(default=0.0, repr=False)
_watch_strike_candidate: bool = field(default=False, repr=False)
_watch_consecutive_strikes: int = field(default=0, repr=False)
_lock: threading.Lock = field(default_factory=threading.Lock)
_reader_thread: Optional[threading.Thread] = field(default=None, repr=False)
_pty: Any = field(default=None, repr=False)
class ProcessRegistry:
"""
In-memory registry of running and finished background processes.
Thread-safe. Accessed from:
- Executor threads (terminal_tool, process tool handlers)
- Gateway asyncio loop (watcher tasks, session reset checks)
- Cleanup thread (sandbox reaping coordination)
"""
_SHELL_NOISE_SUBSTRINGS = (
"bash: cannot set terminal process group",
"bash: no job control in this shell",
"no job control in this shell",
"cannot set terminal process group",
"tcsetattr: Inappropriate ioctl for device",
)
def __init__(self):
self._running: Dict[str, ProcessSession] = {}
self._finished: Dict[str, ProcessSession] = {}
self._lock = threading.Lock()
self.pending_watchers: List[Dict[str, Any]] = []
import queue as _queue_mod
self.completion_queue: _queue_mod.Queue = _queue_mod.Queue()
self._completion_consumed: set = set()
self._global_watch_lock = threading.Lock()
self._global_watch_window_start: float = 0.0
self._global_watch_window_hits: int = 0
self._global_watch_tripped_until: float = 0.0
self._global_watch_suppressed_during_trip: int = 0
@staticmethod
def _clean_shell_noise(text: str) -> str:
"""Strip shell startup warnings from the beginning of output."""
lines = text.split("\n")
while lines and any(noise in lines[0] for noise in ProcessRegistry._SHELL_NOISE_SUBSTRINGS):
lines.pop(0)
return "\n".join(lines)
def _check_watch_patterns(self, session: ProcessSession, new_text: str) -> None:
"""Scan new output for watch patterns and queue notifications.
Called from reader threads with new_text being the freshly-read chunk.
Per-session rate limit: at most ONE watch-match notification per
WATCH_MIN_INTERVAL_SECONDS. Any match arriving inside the cooldown
window is dropped and counts as ONE strike for that window. After
WATCH_STRIKE_LIMIT consecutive strike windows, watch_patterns is
disabled for this session and the session is promoted to
notify_on_complete semantics — one notification when the process
actually exits, no more mid-process spam.
"""
if not session.watch_patterns or session._watch_disabled:
return
if session.exited:
return
matched_lines = []
matched_pattern = None
for line in new_text.splitlines():
for pat in session.watch_patterns:
if pat in line:
matched_lines.append(line.rstrip())
if matched_pattern is None:
matched_pattern = pat
break
if not matched_lines:
return
now = time.time()
should_disable = False
with session._lock:
if session._watch_cooldown_until and now < session._watch_cooldown_until:
session._watch_suppressed += len(matched_lines)
if not session._watch_strike_candidate:
session._watch_strike_candidate = True
session._watch_consecutive_strikes += 1
if session._watch_consecutive_strikes >= WATCH_STRIKE_LIMIT:
session._watch_disabled = True
session.notify_on_complete = True
should_disable = True
return_early = True
else:
if (
session._watch_cooldown_until
and not session._watch_strike_candidate
):
session._watch_consecutive_strikes = 0
session._watch_strike_candidate = False
session._watch_last_emit_at = now
session._watch_cooldown_until = now + WATCH_MIN_INTERVAL_SECONDS
session._watch_hits += 1
suppressed = session._watch_suppressed
session._watch_suppressed = 0
return_early = False
if return_early:
if should_disable:
self.completion_queue.put({
"session_id": session.id,
"session_key": session.session_key,
"command": session.command,
"type": "watch_disabled",
"suppressed": session._watch_suppressed,
"platform": session.watcher_platform,
"chat_id": session.watcher_chat_id,
"user_id": session.watcher_user_id,
"user_name": session.watcher_user_name,
"thread_id": session.watcher_thread_id,
"message_id": session.watcher_message_id,
"message": (
f"Watch patterns disabled for process {session.id} — "
f"{WATCH_STRIKE_LIMIT} consecutive rate-limit windows triggered "
f"(min spacing {WATCH_MIN_INTERVAL_SECONDS}s). "
f"Falling back to notify_on_complete semantics; you'll get "
f"exactly one notification when the process exits."
),
})
return
output = "\n".join(matched_lines[:20])
if len(output) > 2000:
output = output[:2000] + "\n...(truncated)"
if not self._global_watch_admit(now):
return
self.completion_queue.put({
"session_id": session.id,
"session_key": session.session_key,
"command": session.command,
"type": "watch_match",
"pattern": matched_pattern,
"output": output,
"suppressed": suppressed,
"platform": session.watcher_platform,
"chat_id": session.watcher_chat_id,
"user_id": session.watcher_user_id,
"user_name": session.watcher_user_name,
"thread_id": session.watcher_thread_id,
"message_id": session.watcher_message_id,
})
def _global_watch_admit(self, now: float) -> bool:
"""Return True if this watch_match event is allowed through the global breaker.
Semantics:
- If we're currently in a cooldown period, drop the event and count it.
- Otherwise, slide the rolling window and check the global cap.
- If the cap is exceeded, trip the breaker for WATCH_GLOBAL_COOLDOWN_SECONDS
and emit ONE summary event so the agent/user sees "N notifications were
suppressed" instead of getting them individually.
- When the cooldown ends, emit a release summary and reset counters.
"""
with self._global_watch_lock:
if self._global_watch_tripped_until and now >= self._global_watch_tripped_until:
suppressed = self._global_watch_suppressed_during_trip
self._global_watch_tripped_until = 0.0
self._global_watch_suppressed_during_trip = 0
self._global_watch_window_start = now
self._global_watch_window_hits = 0
if suppressed > 0:
release_msg = {
"session_id": "",
"session_key": "",
"command": "",
"type": "watch_overflow_released",
"suppressed": suppressed,
"message": (
f"Watch-pattern notifications resumed. "
f"{suppressed} match event(s) were suppressed during the flood."
),
"platform": "",
"chat_id": "",
"user_id": "",
"user_name": "",
"thread_id": "",
}
else:
release_msg = None
else:
release_msg = None
if self._global_watch_tripped_until and now < self._global_watch_tripped_until:
self._global_watch_suppressed_during_trip += 1
admit = False
trip_now = None
else:
if now - self._global_watch_window_start >= WATCH_GLOBAL_WINDOW_SECONDS:
self._global_watch_window_start = now
self._global_watch_window_hits = 0
if self._global_watch_window_hits >= WATCH_GLOBAL_MAX_PER_WINDOW:
self._global_watch_tripped_until = now + WATCH_GLOBAL_COOLDOWN_SECONDS
self._global_watch_suppressed_during_trip += 1
trip_now = now
admit = False
else:
self._global_watch_window_hits += 1
trip_now = None
admit = True
if release_msg is not None:
self.completion_queue.put(release_msg)
if trip_now is not None:
self.completion_queue.put({
"session_id": "",
"session_key": "",
"command": "",
"type": "watch_overflow_tripped",
"message": (
f"Watch-pattern overflow: >{WATCH_GLOBAL_MAX_PER_WINDOW} "
f"notifications in {WATCH_GLOBAL_WINDOW_SECONDS}s across all processes. "
f"Suppressing further watch_match events for "
f"{WATCH_GLOBAL_COOLDOWN_SECONDS}s."
),
"platform": "",
"chat_id": "",
"user_id": "",
"user_name": "",
"thread_id": "",
})
return admit
@staticmethod
def _is_host_pid_alive(pid: Optional[int]) -> bool:
"""Best-effort liveness check for host-visible PIDs."""
if not pid:
return False
from gateway.status import _pid_exists
return _pid_exists(pid)
def _refresh_detached_session(self, session: Optional[ProcessSession]) -> Optional[ProcessSession]:
"""Update recovered host-PID sessions when the underlying process has exited."""
if session is None or session.exited or not session.detached or session.pid_scope != "host":
return session
if self._is_host_pid_alive(session.pid):
return session
with session._lock:
if session.exited:
return session
session.exited = True
session.exit_code = None
self._move_to_finished(session)
return session
@staticmethod
def _terminate_host_pid(pid: int) -> None:
"""Terminate a host-visible PID without requiring the original process handle."""
if _IS_WINDOWS:
os.kill(pid, signal.SIGTERM)
return
import psutil
try:
parent = psutil.Process(pid)
for child in parent.children(recursive=True):
try:
child.terminate()
except psutil.NoSuchProcess:
pass
parent.terminate()
except psutil.NoSuchProcess:
return
except (OSError, PermissionError):
try:
os.kill(pid, signal.SIGTERM)
except (OSError, ProcessLookupError, PermissionError):
pass
@staticmethod
def _env_temp_dir(env: Any) -> str:
"""Return the writable sandbox temp dir for env-backed background tasks."""
get_temp_dir = getattr(env, "get_temp_dir", None)
if callable(get_temp_dir):
try:
temp_dir = get_temp_dir()
if isinstance(temp_dir, str) and temp_dir.startswith("/"):
return temp_dir.rstrip("/") or "/"
except Exception as exc:
logger.debug("Could not resolve environment temp dir: %s", exc)
return "/tmp"
def spawn_local(
self,
command: str,
cwd: str = None,
task_id: str = "",
session_key: str = "",
env_vars: dict = None,
use_pty: bool = False,
) -> ProcessSession:
"""
Spawn a background process locally.
Only for TERMINAL_ENV=local. Other backends use spawn_via_env().
Args:
use_pty: If True, use a pseudo-terminal via ptyprocess for interactive
CLI tools (Codex, Claude Code, Python REPL). Falls back to
subprocess.Popen if ptyprocess is not installed.
"""
session = ProcessSession(
id=f"proc_{uuid.uuid4().hex[:12]}",
command=command,
task_id=task_id,
session_key=session_key,
cwd=_resolve_safe_cwd(cwd or os.getcwd()),
started_at=time.time(),
)
if use_pty:
try:
if _IS_WINDOWS:
from winpty import PtyProcess as _PtyProcessCls
else:
from ptyprocess import PtyProcess as _PtyProcessCls
user_shell = _find_shell()
pty_env = _sanitize_subprocess_env(os.environ, env_vars)
pty_env["PYTHONUNBUFFERED"] = "1"
pty_proc = _PtyProcessCls.spawn(
[user_shell, "-lic", f"set +m; {command}"],
cwd=session.cwd,
env=pty_env,
dimensions=(30, 120),
)
session.pid = pty_proc.pid
session._pty = pty_proc
reader = threading.Thread(
target=self._pty_reader_loop,
args=(session,),
daemon=True,
name=f"proc-pty-reader-{session.id}",
)
session._reader_thread = reader
reader.start()
with self._lock:
self._prune_if_needed()
self._running[session.id] = session
self._write_checkpoint()
return session
except ImportError:
logger.warning("ptyprocess not installed, falling back to pipe mode")
except Exception as e:
logger.warning("PTY spawn failed (%s), falling back to pipe mode", e)
user_shell = _find_shell()
bg_env = _sanitize_subprocess_env(os.environ, env_vars)
bg_env["PYTHONUNBUFFERED"] = "1"
_popen_kwargs = {"creationflags": windows_hide_flags()} if _IS_WINDOWS else {}
proc = subprocess.Popen(
[user_shell, "-lic", f"set +m; {command}"],
text=True,
cwd=session.cwd,
env=bg_env,
encoding="utf-8",
errors="replace",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.DEVNULL,
preexec_fn=None if _IS_WINDOWS else os.setsid,
**_popen_kwargs,
)
session.process = proc
session.pid = proc.pid
try:
reader = threading.Thread(
target=self._reader_loop,
args=(session,),
daemon=True,
name=f"proc-reader-{session.id}",
)
session._reader_thread = reader
reader.start()
with self._lock:
self._prune_if_needed()
self._running[session.id] = session
self._write_checkpoint()
except Exception:
try:
if not _IS_WINDOWS:
try:
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
except (ProcessLookupError, PermissionError, OSError):
proc.kill()
else:
proc.kill()
except Exception:
pass
try:
proc.wait(timeout=5)
except Exception:
pass
raise
return session
def spawn_via_env(
self,
env: Any,
command: str,
cwd: str = None,
task_id: str = "",
session_key: str = "",
timeout: int = 10,
) -> ProcessSession:
"""
Spawn a background process through a non-local environment backend.
For Docker/Singularity/Modal/Daytona/SSH: runs the command inside the sandbox
using the environment's execute() interface. We wrap the command to
capture the in-sandbox PID and redirect output to a log file inside
the sandbox, then poll the log via subsequent execute() calls.
This is less capable than local spawn (no live stdout pipe, no stdin),
but it ensures the command runs in the correct sandbox context.
"""
session = ProcessSession(
id=f"proc_{uuid.uuid4().hex[:12]}",
command=command,
task_id=task_id,
session_key=session_key,
cwd=cwd,
started_at=time.time(),
env_ref=env,
pid_scope="sandbox",
)
temp_dir = self._env_temp_dir(env)
log_path = f"{temp_dir}/hermes_bg_{session.id}.log"
pid_path = f"{temp_dir}/hermes_bg_{session.id}.pid"
exit_path = f"{temp_dir}/hermes_bg_{session.id}.exit"
quoted_command = shlex.quote(command)
quoted_temp_dir = shlex.quote(temp_dir)
quoted_log_path = shlex.quote(log_path)
quoted_pid_path = shlex.quote(pid_path)
quoted_exit_path = shlex.quote(exit_path)
bg_command = (
f"mkdir -p {quoted_temp_dir} && "
f"( nohup bash -lc {quoted_command} > {quoted_log_path} 2>&1; "
f"rc=$?; printf '%s\\n' \"$rc\" > {quoted_exit_path} ) & "
f"echo $! > {quoted_pid_path} && cat {quoted_pid_path}"
)
try:
result = env.execute(bg_command, timeout=timeout)
output = result.get("output", "").strip()
for line in output.splitlines():
line = line.strip()
if line.isdigit():
session.pid = int(line)
break
except Exception as e:
session.exited = True
session.exit_code = -1
session.output_buffer = f"Failed to start: {e}"
if not session.exited:
reader = threading.Thread(
target=self._env_poller_loop,
args=(session, env, log_path, pid_path, exit_path),
daemon=True,
name=f"proc-poller-{session.id}",
)
session._reader_thread = reader
reader.start()
with self._lock:
self._prune_if_needed()
self._running[session.id] = session
self._write_checkpoint()
return session
def _reader_loop(self, session: ProcessSession):
"""Background thread: read stdout from a local Popen process."""
first_chunk = True
try:
while True:
chunk = session.process.stdout.read(4096)
if not chunk:
break
if first_chunk:
chunk = self._clean_shell_noise(chunk)
first_chunk = False
with session._lock:
session.output_buffer += chunk
if len(session.output_buffer) > session.max_output_chars:
session.output_buffer = session.output_buffer[-session.max_output_chars:]
self._check_watch_patterns(session, chunk)
except Exception as e:
logger.debug("Process stdout reader ended: %s", e)
finally:
try:
session.process.wait(timeout=5)
except Exception as e:
logger.debug("Process wait timed out or failed: %s", e)
session.exited = True
session.exit_code = session.process.returncode
self._move_to_finished(session)
def _env_poller_loop(
self, session: ProcessSession, env: Any, log_path: str, pid_path: str, exit_path: str
):
"""Background thread: poll a sandbox log file for non-local backends."""
quoted_log_path = shlex.quote(log_path)
quoted_pid_path = shlex.quote(pid_path)
quoted_exit_path = shlex.quote(exit_path)
prev_output_len = 0
while not session.exited:
time.sleep(2)
try:
result = env.execute(f"cat {quoted_log_path} 2>/dev/null", timeout=10)
new_output = result.get("output", "")
if new_output:
delta = new_output[prev_output_len:] if len(new_output) > prev_output_len else ""
prev_output_len = len(new_output)
with session._lock:
session.output_buffer = new_output
if len(session.output_buffer) > session.max_output_chars:
session.output_buffer = session.output_buffer[-session.max_output_chars:]
if delta:
self._check_watch_patterns(session, delta)
check = env.execute(
f"kill -0 \"$(cat {quoted_pid_path} 2>/dev/null)\" 2>/dev/null; echo $?",
timeout=5,
)
check_output = check.get("output", "").strip()
if check_output and check_output.splitlines()[-1].strip() != "0":
exit_result = env.execute(
f"cat {quoted_exit_path} 2>/dev/null",
timeout=5,
)
exit_str = exit_result.get("output", "").strip()
try:
session.exit_code = int(exit_str.splitlines()[-1].strip())
except (ValueError, IndexError):
session.exit_code = -1
session.exited = True
self._move_to_finished(session)
return
except Exception:
session.exited = True
session.exit_code = -1
self._move_to_finished(session)
return
def _pty_reader_loop(self, session: ProcessSession):
"""Background thread: read output from a PTY process."""
pty = session._pty
try:
while pty.isalive():
try:
chunk = pty.read(4096)
if chunk:
text = chunk if isinstance(chunk, str) else chunk.decode("utf-8", errors="replace")
with session._lock:
session.output_buffer += text
if len(session.output_buffer) > session.max_output_chars:
session.output_buffer = session.output_buffer[-session.max_output_chars:]
self._check_watch_patterns(session, text)
except EOFError:
break
except Exception:
break
except Exception as e:
logger.debug("PTY stdout reader ended: %s", e)
try:
pty.wait()
except Exception as e:
logger.debug("PTY wait timed out or failed: %s", e)
session.exited = True
session.exit_code = pty.exitstatus if hasattr(pty, 'exitstatus') else -1
self._move_to_finished(session)
def _move_to_finished(self, session: ProcessSession):
"""Move a session from running to finished.
Idempotent: if the session was already moved (e.g. kill_process raced
with the reader thread), the second call is a no-op — no duplicate
completion notification is enqueued.
"""
with self._lock:
was_running = self._running.pop(session.id, None) is not None
self._finished[session.id] = session
self._write_checkpoint()
if was_running and session.notify_on_complete:
from tools.ansi_strip import strip_ansi
output_tail = strip_ansi(session.output_buffer[-2000:]) if session.output_buffer else ""
self.completion_queue.put({
"type": "completion",
"session_id": session.id,
"command": session.command,
"exit_code": session.exit_code,
"output": output_tail,
})
def is_completion_consumed(self, session_id: str) -> bool:
"""Check if a completion notification was already consumed via wait/poll/log."""
return session_id in self._completion_consumed
def drain_notifications(self) -> "list[tuple[dict, str]]":
"""Pop all pending notification events and return formatted pairs.
Returns a list of (raw_event, formatted_text) tuples.
Skips completion events that were already consumed via wait/poll/log.
"""
results = []
while not self.completion_queue.empty():
try:
evt = self.completion_queue.get_nowait()
except Exception:
break
_evt_sid = evt.get("session_id", "")
if evt.get("type") == "completion" and self.is_completion_consumed(_evt_sid):
continue
text = format_process_notification(evt)
if text:
results.append((evt, text))
return results
def get(self, session_id: str) -> Optional[ProcessSession]:
"""Get a session by ID (running or finished)."""
with self._lock:
session = self._running.get(session_id) or self._finished.get(session_id)
return self._refresh_detached_session(session)
def _reconcile_local_exit(self, session: "ProcessSession") -> None:
"""Reconcile session.exited against the real child process state.
The reader thread (`_reader_loop`) sets `session.exited = True` only
in its `finally` block, which runs when `stdout.read()` returns EOF.
If the direct `Popen` child has exited but a descendant process (e.g.
a daemon spawned by `hermes update` restarting the gateway) is still
holding the stdout pipe open, the reader blocks forever and poll()
keeps returning "running" indefinitely (issue #17327 — 74 polls over
7 minutes on Feishu).
This helper closes that window: when `session.exited` is still False
but the direct child's `Popen.poll()` reports an exit code, drain any
readable bytes non-blocking and flip `session.exited`. The orphaned
reader thread remains stuck on its blocking `read()` but is a daemon
thread and will be reaped with the process.
Safe no-op on sessions without a local `Popen` (env/PTY), already-
exited sessions, and detached-recovered sessions.
"""
if session is None or session.exited:
return
proc = getattr(session, "process", None)
if proc is None:
return
try:
rc = proc.poll()
except Exception:
return
if rc is None:
return
drained = ""
stdout = getattr(proc, "stdout", None)
if stdout is not None and not _IS_WINDOWS:
try:
import fcntl
fd = stdout.fileno()
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
try:
chunk = stdout.read()
if chunk:
drained = chunk if isinstance(chunk, str) else chunk.decode("utf-8", errors="replace")
except (BlockingIOError, OSError, ValueError):
pass
finally:
try:
fcntl.fcntl(fd, fcntl.F_SETFL, flags)
except Exception:
pass
except Exception as e:
logger.debug("Non-blocking drain failed for %s: %s", session.id, e)
with session._lock:
if drained:
session.output_buffer += drained
if len(session.output_buffer) > session.max_output_chars:
session.output_buffer = session.output_buffer[-session.max_output_chars:]
session.exited = True
session.exit_code = rc
logger.info(
"Reconciled session %s: direct child exited with code %s but reader "
"was still blocked (orphaned pipe). Flipped to exited.",
session.id, rc,
)
self._move_to_finished(session)
def poll(self, session_id: str) -> dict:
"""Check status and get new output for a background process."""
from tools.ansi_strip import strip_ansi
session = self.get(session_id)
if session is None:
return {"status": "not_found", "error": f"No process with ID {session_id}"}
self._reconcile_local_exit(session)
with session._lock:
output_preview = strip_ansi(session.output_buffer[-1000:]) if session.output_buffer else ""
result = {
"session_id": session.id,
"command": session.command,
"status": "exited" if session.exited else "running",
"pid": session.pid,
"uptime_seconds": int(time.time() - session.started_at),
"output_preview": output_preview,
}
if session.exited:
result["exit_code"] = session.exit_code
self._completion_consumed.add(session_id)
if session.detached:
result["detached"] = True
result["note"] = "Process recovered after restart -- output history unavailable"
return result
def read_log(self, session_id: str, offset: int = 0, limit: int = 200) -> dict:
"""Read the full output log with optional pagination by lines."""
from tools.ansi_strip import strip_ansi
session = self.get(session_id)
if session is None:
return {"status": "not_found", "error": f"No process with ID {session_id}"}
with session._lock:
full_output = strip_ansi(session.output_buffer)
lines = full_output.splitlines()
total_lines = len(lines)
if offset == 0 and limit > 0:
selected = lines[-limit:]
else:
selected = lines[offset:offset + limit]
result = {
"session_id": session.id,
"status": "exited" if session.exited else "running",
"output": "\n".join(selected),
"total_lines": total_lines,
"showing": f"{len(selected)} lines",
}
if session.exited:
self._completion_consumed.add(session_id)
return result
def wait(self, session_id: str, timeout: int = None) -> dict:
"""
Block until a process exits, timeout, or interrupt.
Args:
session_id: The process to wait for.
timeout: Max seconds to block. Falls back to TERMINAL_TIMEOUT config.
Returns:
dict with status ("exited", "timeout", "interrupted", "not_found")
and output snapshot.
"""
from tools.ansi_strip import strip_ansi
from tools.interrupt import is_interrupted as _is_interrupted
try:
default_timeout = int(os.getenv("TERMINAL_TIMEOUT", "180"))
except (ValueError, TypeError):
default_timeout = 180
max_timeout = default_timeout
requested_timeout = timeout
timeout_note = None
if requested_timeout and requested_timeout > max_timeout:
effective_timeout = max_timeout
timeout_note = (
f"Requested wait of {requested_timeout}s was clamped "
f"to configured limit of {max_timeout}s"
)
else:
effective_timeout = requested_timeout or max_timeout
session = self.get(session_id)
if session is None:
return {"status": "not_found", "error": f"No process with ID {session_id}"}
deadline = time.monotonic() + effective_timeout
while time.monotonic() < deadline:
session = self._refresh_detached_session(session)
self._reconcile_local_exit(session)
if session.exited:
self._completion_consumed.add(session_id)
result = {
"status": "exited",
"exit_code": session.exit_code,
"output": strip_ansi(session.output_buffer[-2000:]),
}
if timeout_note:
result["timeout_note"] = timeout_note
return result
if _is_interrupted():
result = {
"status": "interrupted",
"output": strip_ansi(session.output_buffer[-1000:]),
"note": "User sent a new message -- wait interrupted",
}
if timeout_note:
result["timeout_note"] = timeout_note
return result
time.sleep(1)
result = {
"status": "timeout",
"output": strip_ansi(session.output_buffer[-1000:]),
}
if timeout_note:
result["timeout_note"] = timeout_note
else:
result["timeout_note"] = f"Waited {effective_timeout}s, process still running"
return result
def kill_process(self, session_id: str) -> dict:
"""Kill a background process."""
session = self.get(session_id)
if session is None:
return {"status": "not_found", "error": f"No process with ID {session_id}"}
if session.exited:
return {
"status": "already_exited",
"exit_code": session.exit_code,
}
try:
if session._pty:
try:
session._pty.terminate(force=True)
except Exception:
if session.pid:
os.kill(session.pid, signal.SIGTERM)
elif session.process:
try:
if _IS_WINDOWS:
session.process.terminate()
else:
import psutil
try:
parent = psutil.Process(session.process.pid)
for child in parent.children(recursive=True):
try:
child.terminate()
except psutil.NoSuchProcess:
pass
parent.terminate()
except psutil.NoSuchProcess:
pass
except (ProcessLookupError, PermissionError):
session.process.kill()
elif session.env_ref and session.pid:
session.env_ref.execute(f"kill {session.pid} 2>/dev/null", timeout=5)
elif session.detached and session.pid_scope == "host" and session.pid:
if not self._is_host_pid_alive(session.pid):
with session._lock:
session.exited = True
session.exit_code = None
self._move_to_finished(session)
return {
"status": "already_exited",
"exit_code": session.exit_code,
}
self._terminate_host_pid(session.pid)
else:
return {
"status": "error",
"error": (
"Recovered process cannot be killed after restart because "
"its original runtime handle is no longer available"
),
}
session.exited = True
session.exit_code = -15
self._move_to_finished(session)
self._write_checkpoint()
return {"status": "killed", "session_id": session.id}
except Exception as e:
return {"status": "error", "error": str(e)}
def write_stdin(self, session_id: str, data: str) -> dict:
"""Send raw data to a running process's stdin (no newline appended)."""
session = self.get(session_id)
if session is None:
return {"status": "not_found", "error": f"No process with ID {session_id}"}
if session.exited:
return {"status": "already_exited", "error": "Process has already finished"}
if hasattr(session, '_pty') and session._pty:
try:
pty_data = data.encode("utf-8") if isinstance(data, str) else data
session._pty.write(pty_data)
return {"status": "ok", "bytes_written": len(data)}
except Exception as e:
return {"status": "error", "error": str(e)}
if not session.process or not session.process.stdin:
return {"status": "error", "error": "Process stdin not available (non-local backend or stdin closed)"}
try:
session.process.stdin.write(data)
session.process.stdin.flush()
return {"status": "ok", "bytes_written": len(data)}
except Exception as e:
return {"status": "error", "error": str(e)}
def submit_stdin(self, session_id: str, data: str = "") -> dict:
"""Send data + newline to a running process's stdin (like pressing Enter)."""
return self.write_stdin(session_id, data + "\n")
def close_stdin(self, session_id: str) -> dict:
"""Close a running process's stdin / send EOF without killing the process."""
session = self.get(session_id)
if session is None:
return {"status": "not_found", "error": f"No process with ID {session_id}"}
if session.exited:
return {"status": "already_exited", "error": "Process has already finished"}
if hasattr(session, '_pty') and session._pty:
try:
session._pty.sendeof()
return {"status": "ok", "message": "EOF sent"}
except Exception as e:
return {"status": "error", "error": str(e)}
if not session.process or not session.process.stdin:
return {"status": "error", "error": "Process stdin not available (non-local backend or stdin closed)"}
try:
session.process.stdin.close()
return {"status": "ok", "message": "stdin closed"}
except Exception as e:
return {"status": "error", "error": str(e)}
def list_sessions(self, task_id: str = None) -> list:
"""List all running and recently-finished processes."""
with self._lock:
all_sessions = list(self._running.values()) + list(self._finished.values())
all_sessions = [self._refresh_detached_session(s) for s in all_sessions]
if task_id:
all_sessions = [s for s in all_sessions if s.task_id == task_id]
result = []
for s in all_sessions:
entry = {
"session_id": s.id,
"command": s.command[:200],
"cwd": s.cwd,
"pid": s.pid,
"started_at": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(s.started_at)),
"uptime_seconds": int(time.time() - s.started_at),
"status": "exited" if s.exited else "running",
"output_preview": s.output_buffer[-200:] if s.output_buffer else "",
}
if s.exited:
entry["exit_code"] = s.exit_code
if s.detached:
entry["detached"] = True
result.append(entry)
return result
def has_active_processes(self, task_id: str) -> bool:
"""Check if there are active (running) processes for a task_id."""
with self._lock:
sessions = list(self._running.values())
for session in sessions:
self._refresh_detached_session(session)
with self._lock:
return any(
s.task_id == task_id and not s.exited
for s in self._running.values()
)
def has_active_for_session(self, session_key: str) -> bool:
"""Check if there are active processes for a gateway session key."""
with self._lock:
sessions = list(self._running.values())
for session in sessions:
self._refresh_detached_session(session)
with self._lock:
return any(
s.session_key == session_key and not s.exited
for s in self._running.values()
)
def kill_all(self, task_id: str = None) -> int:
"""Kill all running processes, optionally filtered by task_id. Returns count killed."""
with self._lock:
targets = [
s for s in self._running.values()
if (task_id is None or s.task_id == task_id) and not s.exited
]
killed = 0
for session in targets:
result = self.kill_process(session.id)
if result.get("status") in {"killed", "already_exited"}:
killed += 1
return killed
def _prune_if_needed(self):
"""Remove oldest finished sessions if over MAX_PROCESSES. Must hold _lock."""
now = time.time()
expired = [
sid for sid, s in self._finished.items()
if (now - s.started_at) > FINISHED_TTL_SECONDS
]
for sid in expired:
del self._finished[sid]
self._completion_consumed.discard(sid)
total = len(self._running) + len(self._finished)
if total >= MAX_PROCESSES and self._finished:
oldest_id = min(self._finished, key=lambda sid: self._finished[sid].started_at)
del self._finished[oldest_id]
self._completion_consumed.discard(oldest_id)
tracked = self._running.keys() | self._finished.keys()
stale = self._completion_consumed - tracked
if stale:
self._completion_consumed -= stale
def _write_checkpoint(self):
"""Write running process metadata to checkpoint file atomically."""
try:
with self._lock:
entries = []
for s in self._running.values():
if not s.exited:
entries.append({
"session_id": s.id,
"command": s.command,
"pid": s.pid,
"pid_scope": s.pid_scope,
"cwd": s.cwd,
"started_at": s.started_at,
"task_id": s.task_id,
"session_key": s.session_key,
"watcher_platform": s.watcher_platform,
"watcher_chat_id": s.watcher_chat_id,
"watcher_user_id": s.watcher_user_id,
"watcher_user_name": s.watcher_user_name,
"watcher_thread_id": s.watcher_thread_id,
"watcher_message_id": s.watcher_message_id,
"watcher_interval": s.watcher_interval,
"notify_on_complete": s.notify_on_complete,
"watch_patterns": s.watch_patterns,
})
from utils import atomic_json_write
atomic_json_write(CHECKPOINT_PATH, entries)
except Exception as e:
logger.debug("Failed to write checkpoint file: %s", e, exc_info=True)
def recover_from_checkpoint(self) -> int:
"""
On gateway startup, probe PIDs from checkpoint file.
Returns the number of processes recovered as detached.
"""
if not CHECKPOINT_PATH.exists():
return 0
try:
entries = json.loads(CHECKPOINT_PATH.read_text(encoding="utf-8"))
except Exception:
return 0
recovered = 0
for entry in entries:
pid = entry.get("pid")
if not pid:
continue
pid_scope = entry.get("pid_scope", "host")
if pid_scope != "host":
logger.info(
"Skipping recovery for non-host process: %s (pid=%s, scope=%s)",
entry.get("command", "unknown")[:60],
pid,
pid_scope,
)
continue
alive = self._is_host_pid_alive(pid)
if alive:
session = ProcessSession(
id=entry["session_id"],
command=entry.get("command", "unknown"),
task_id=entry.get("task_id", ""),
session_key=entry.get("session_key", ""),
pid=pid,
pid_scope=pid_scope,
cwd=entry.get("cwd"),
started_at=entry.get("started_at", time.time()),
detached=True,
watcher_platform=entry.get("watcher_platform", ""),
watcher_chat_id=entry.get("watcher_chat_id", ""),
watcher_user_id=entry.get("watcher_user_id", ""),
watcher_user_name=entry.get("watcher_user_name", ""),
watcher_thread_id=entry.get("watcher_thread_id", ""),
watcher_message_id=entry.get("watcher_message_id", ""),
watcher_interval=entry.get("watcher_interval", 0),
notify_on_complete=entry.get("notify_on_complete", False),
watch_patterns=entry.get("watch_patterns", []),
)
with self._lock:
self._running[session.id] = session
recovered += 1
logger.info("Recovered detached process: %s (pid=%d)", session.command[:60], pid)
if session.watcher_interval > 0:
self.pending_watchers.append({
"session_id": session.id,
"check_interval": session.watcher_interval,
"session_key": session.session_key,
"platform": session.watcher_platform,
"chat_id": session.watcher_chat_id,
"user_id": session.watcher_user_id,
"user_name": session.watcher_user_name,
"thread_id": session.watcher_thread_id,
"message_id": session.watcher_message_id,
"notify_on_complete": session.notify_on_complete,
})
self._write_checkpoint()
return recovered
process_registry = ProcessRegistry()
def format_process_notification(evt: dict) -> "str | None":
"""Format a process notification event into a [IMPORTANT: ...] message.
Handles completion events (notify_on_complete), watch pattern matches,
and watch disabled events from the unified completion_queue.
"""
evt_type = evt.get("type", "completion")
_sid = evt.get("session_id", "unknown")
_cmd = evt.get("command", "unknown")
if evt_type == "watch_disabled":
return f"[IMPORTANT: {evt.get('message', '')}]"
if evt_type == "watch_match":
_pat = evt.get("pattern", "?")
_out = evt.get("output", "")
_sup = evt.get("suppressed", 0)
text = (
f"[IMPORTANT: Background process {_sid} matched "
f"watch pattern \"{_pat}\".\n"
f"Command: {_cmd}\n"
f"Matched output:\n{_out}"
)
if _sup:
text += f"\n({_sup} earlier matches were suppressed by rate limit)"
text += "]"
return text
_exit = evt.get("exit_code", "?")
_out = evt.get("output", "")
return (
f"[IMPORTANT: Background process {_sid} completed "
f"(exit code {_exit}).\n"
f"Command: {_cmd}\n"
f"Output:\n{_out}]"
)
from tools.registry import registry, tool_error
PROCESS_SCHEMA = {
"name": "process",
"description": (
"Manage background processes started with terminal(background=true). "
"Actions: 'list' (show all), 'poll' (check status + new output), "
"'log' (full output with pagination), 'wait' (block until done or timeout), "
"'kill' (terminate), 'write' (send raw stdin data without newline), "
"'submit' (send data + Enter, for answering prompts), 'close' (close stdin/send EOF)."
),
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["list", "poll", "log", "wait", "kill", "write", "submit", "close"],
"description": "Action to perform on background processes"
},
"session_id": {
"type": "string",
"description": "Process session ID (from terminal background output). Required for all actions except 'list'."
},
"data": {
"type": "string",
"description": "Text to send to process stdin (for 'write' and 'submit' actions)"
},
"timeout": {
"type": "integer",
"description": "Max seconds to block for 'wait' action. Returns partial output on timeout.",
"minimum": 1
},
"offset": {
"type": "integer",
"description": "Line offset for 'log' action (default: last 200 lines)"
},
"limit": {
"type": "integer",
"description": "Max lines to return for 'log' action",
"minimum": 1
}
},
"required": ["action"]
}
}
def _handle_process(args, **kw):
task_id = kw.get("task_id")
action = args.get("action", "")
session_id = str(args.get("session_id", "")) if args.get("session_id") is not None else ""
if action == "list":
return json.dumps({"processes": process_registry.list_sessions(task_id=task_id)}, ensure_ascii=False)
elif action in {"poll", "log", "wait", "kill", "write", "submit", "close"}:
if not session_id:
return tool_error(f"session_id is required for {action}")
if action == "poll":
return json.dumps(process_registry.poll(session_id), ensure_ascii=False)
elif action == "log":
return json.dumps(process_registry.read_log(
session_id, offset=args.get("offset", 0), limit=args.get("limit", 200)), ensure_ascii=False)
elif action == "wait":
return json.dumps(process_registry.wait(session_id, timeout=args.get("timeout")), ensure_ascii=False)
elif action == "kill":
return json.dumps(process_registry.kill_process(session_id), ensure_ascii=False)
elif action == "write":
return json.dumps(process_registry.write_stdin(session_id, str(args.get("data", ""))), ensure_ascii=False)
elif action == "submit":
return json.dumps(process_registry.submit_stdin(session_id, str(args.get("data", ""))), ensure_ascii=False)
elif action == "close":
return json.dumps(process_registry.close_stdin(session_id), ensure_ascii=False)
return tool_error(f"Unknown process action: {action}. Use: list, poll, log, wait, kill, write, submit, close")
registry.register(
name="process",
toolset="terminal",
schema=PROCESS_SCHEMA,
handler=_handle_process,
emoji="⚙️",
)