"""Dangerous command approval -- detection, prompting, and per-session state.
This module is the single source of truth for the dangerous command system:
- Pattern detection (DANGEROUS_PATTERNS, detect_dangerous_command)
- Per-session approval state (thread-safe, keyed by session_key)
- Approval prompting (CLI interactive + gateway async)
- Smart approval via auxiliary LLM (auto-approve low-risk commands)
- Permanent allowlist persistence (config.yaml)
"""
import contextvars
import logging
import os
import re
import sys
import threading
import time
import unicodedata
from typing import Optional
from hermes_cli.config import cfg_get
from utils import env_var_enabled, is_truthy_value
logger = logging.getLogger(__name__)
_approval_session_key: contextvars.ContextVar[str] = contextvars.ContextVar(
"approval_session_key",
default="",
)
def _fire_approval_hook(hook_name: str, **kwargs) -> None:
"""Invoke a plugin lifecycle hook for the approval system.
Lazy-imports the plugin manager to avoid circular imports (approval.py is
imported very early, long before plugins are discovered). Never raises --
plugin errors are logged and swallowed.
Only fires for the two approval-specific hooks in VALID_HOOKS:
pre_approval_request, post_approval_response.
"""
try:
from hermes_cli.plugins import invoke_hook
except Exception:
return
try:
invoke_hook(hook_name, **kwargs)
except Exception as exc:
logger.debug("Approval hook %s dispatch failed: %s", hook_name, exc)
def set_current_session_key(session_key: str) -> contextvars.Token[str]:
"""Bind the active approval session key to the current context."""
return _approval_session_key.set(session_key or "")
def reset_current_session_key(token: contextvars.Token[str]) -> None:
"""Restore the prior approval session key context."""
_approval_session_key.reset(token)
def get_current_session_key(default: str = "default") -> str:
"""Return the active session key, preferring context-local state.
Resolution order:
1. approval-specific contextvars (set by gateway before agent.run)
2. session_context contextvars (set by _set_session_env)
3. os.environ fallback (CLI, cron, tests)
"""
session_key = _approval_session_key.get()
if session_key:
return session_key
from gateway.session_context import get_session_env
return get_session_env("HERMES_SESSION_KEY", default)
def _get_session_platform() -> str:
"""Return the current gateway platform from contextvars/env fallback."""
try:
from gateway.session_context import get_session_env
return get_session_env("HERMES_SESSION_PLATFORM", "") or ""
except Exception:
return os.getenv("HERMES_SESSION_PLATFORM", "") or ""
def _is_gateway_approval_context() -> bool:
"""True when this call is inside a gateway/API session.
Legacy gateway integrations set HERMES_GATEWAY_SESSION in process env.
Newer concurrent gateway paths bind HERMES_SESSION_PLATFORM via
contextvars so approval mode does not depend on process-global flags.
Cron jobs are NEVER gateway-approval contexts even when they originate
from a gateway platform (cron binds HERMES_SESSION_PLATFORM via
contextvars for delivery routing). Cron approvals are governed by
``approvals.cron_mode`` config, not interactive resolve — letting cron
fall through to the gateway branch would submit a pending approval
with no listener and block the job indefinitely.
"""
if env_var_enabled("HERMES_CRON_SESSION"):
return False
if env_var_enabled("HERMES_GATEWAY_SESSION"):
return True
return bool(_get_session_platform())
_SSH_SENSITIVE_PATH = r'(?:~|\$home|\$\{home\})/\.ssh(?:/|$)'
_HERMES_ENV_PATH = (
r'(?:~\/\.hermes/|'
r'(?:\$home|\$\{home\})/\.hermes/|'
r'(?:\$hermes_home|\$\{hermes_home\})/)'
r'\.env\b'
)
_PROJECT_ENV_PATH = r'(?:(?:/|\.{1,2}/)?(?:[^\s/"\'`]+/)*\.env(?:\.[^/\s"\'`]+)*)'
_PROJECT_CONFIG_PATH = r'(?:(?:/|\.{1,2}/)?(?:[^\s/"\'`]+/)*config\.yaml)'
_SHELL_RC_FILES = (
r'(?:~|\$home|\$\{home\})/\.'
r'(?:bashrc|zshrc|profile|bash_profile|zprofile)\b'
)
_CREDENTIAL_FILES = (
r'(?:~|\$home|\$\{home\})/\.'
r'(?:netrc|pgpass|npmrc|pypirc)\b'
)
_MACOS_PRIVATE_SYSTEM_PATH = r'/private/(?:etc|var|tmp|home)/'
_SYSTEM_CONFIG_PATH = (
rf'(?:/etc/|{_MACOS_PRIVATE_SYSTEM_PATH})'
)
_SENSITIVE_WRITE_TARGET = (
rf'(?:{_SYSTEM_CONFIG_PATH}|/dev/sd|'
rf'{_SSH_SENSITIVE_PATH}|'
rf'{_HERMES_ENV_PATH}|'
rf'{_SHELL_RC_FILES}|'
rf'{_CREDENTIAL_FILES})'
)
_PROJECT_SENSITIVE_WRITE_TARGET = rf'(?:{_PROJECT_ENV_PATH}|{_PROJECT_CONFIG_PATH})'
_COMMAND_TAIL = r'(?:\s*(?:&&|\|\||;).*)?$'
_CMDPOS = (
r'(?:^|[;&|\n`]|\$\()'
r'\s*'
r'(?:sudo\s+(?:-[^\s]+\s+)*)?'
r'(?:env\s+(?:\w+=\S*\s+)*)?'
r'(?:(?:exec|nohup|setsid|time)\s+)*'
r'\s*'
)
HARDLINE_PATTERNS = [
(r'\brm\s+(-[^\s]*\s+)*(/|/\*|/ \*)(\s|$)', "recursive delete of root filesystem"),
(r'\brm\s+(-[^\s]*\s+)*(/home|/home/\*|/root|/root/\*|/etc|/etc/\*|/usr|/usr/\*|/var|/var/\*|/bin|/bin/\*|/sbin|/sbin/\*|/boot|/boot/\*|/lib|/lib/\*)(\s|$)', "recursive delete of system directory"),
(r'\brm\s+(-[^\s]*\s+)*(~|\$HOME)(/?|/\*)?(\s|$)', "recursive delete of home directory"),
(r'\bmkfs(\.[a-z0-9]+)?\b', "format filesystem (mkfs)"),
(r'\bdd\b[^\n]*\bof=/dev/(sd|nvme|hd|mmcblk|vd|xvd)[a-z0-9]*', "dd to raw block device"),
(r'>\s*/dev/(sd|nvme|hd|mmcblk|vd|xvd)[a-z0-9]*\b', "redirect to raw block device"),
(r':\(\)\s*\{\s*:\s*\|\s*:\s*&\s*\}\s*;\s*:', "fork bomb"),
(r'\bkill\s+(-[^\s]+\s+)*-1\b', "kill all processes"),
(_CMDPOS + r'(shutdown|reboot|halt|poweroff)\b', "system shutdown/reboot"),
(_CMDPOS + r'init\s+[06]\b', "init 0/6 (shutdown/reboot)"),
(_CMDPOS + r'systemctl\s+(poweroff|reboot|halt|kexec)\b', "systemctl poweroff/reboot"),
(_CMDPOS + r'telinit\s+[06]\b', "telinit 0/6 (shutdown/reboot)"),
]
_RE_FLAGS = re.IGNORECASE | re.DOTALL
HARDLINE_PATTERNS_COMPILED = [
(re.compile(pattern, _RE_FLAGS), description)
for pattern, description in HARDLINE_PATTERNS
]
_SUDO_STDIN_RE = re.compile(
r'(?:^|[;&|`\n]|&&|\|\||\$\()\s*sudo\s+-S\b',
re.IGNORECASE)
def _check_sudo_stdin_guard(command: str) -> tuple:
"""Detect ``sudo -S`` (stdin password) without configured SUDO_PASSWORD.
When SUDO_PASSWORD is set, ``_transform_sudo_command`` injects ``-S``
internally — that path is legitimate and handled elsewhere. This guard
only fires when SUDO_PASSWORD is *not* set, meaning the LLM explicitly
wrote ``sudo -S`` to pipe a guessed password.
Returns:
(is_blocked: bool, description: str | None)
"""
if "SUDO_PASSWORD" in os.environ:
return (False, None)
normalized = _normalize_command_for_detection(command).lower()
if _SUDO_STDIN_RE.search(normalized):
return (True, "sudo password guessing via stdin (sudo -S)")
return (False, None)
def detect_hardline_command(command: str) -> tuple:
"""Check if a command matches the unconditional hardline blocklist.
Returns:
(is_hardline, description) or (False, None)
"""
normalized = _normalize_command_for_detection(command).lower()
for pattern_re, description in HARDLINE_PATTERNS_COMPILED:
if pattern_re.search(normalized):
return (True, description)
return (False, None)
def _hardline_block_result(description: str) -> dict:
"""Build the standard block result for a hardline match."""
return {
"approved": False,
"hardline": True,
"message": (
f"BLOCKED (hardline): {description}. "
"This command is on the unconditional blocklist and cannot "
"be executed via the agent — not even with --yolo, /yolo, "
"approvals.mode=off, or cron approve mode. If you genuinely "
"need to run it, run it yourself in a terminal outside the "
"agent."
),
}
def _sudo_stdin_block_result(description: str) -> dict:
"""Build the standard block result for sudo stdin guard."""
return {
"approved": False,
"message": (
f"BLOCKED: {description}. "
"Do not pipe passwords to 'sudo -S' — this is a brute-force "
"attack vector. Set SUDO_PASSWORD in your .env file if the "
"agent needs passwordless sudo, or run the sudo command "
"manually in your own terminal."
),
}
DANGEROUS_PATTERNS = [
(r'\brm\s+(-[^\s]*\s+)*/', "delete in root path"),
(r'\brm\s+-[^\s]*r', "recursive delete"),
(r'\brm\s+--recursive\b', "recursive delete (long flag)"),
(r'\bchmod\s+(-[^\s]*\s+)*(777|666|o\+[rwx]*w|a\+[rwx]*w)\b', "world/other-writable permissions"),
(r'\bchmod\s+--recursive\b.*(777|666|o\+[rwx]*w|a\+[rwx]*w)', "recursive world/other-writable (long flag)"),
(r'\bchown\s+(-[^\s]*)?R\s+root', "recursive chown to root"),
(r'\bchown\s+--recursive\b.*root', "recursive chown to root (long flag)"),
(r'\bmkfs\b', "format filesystem"),
(r'\bdd\s+.*if=', "disk copy"),
(r'>\s*/dev/sd', "write to block device"),
(r'\bDROP\s+(TABLE|DATABASE)\b', "SQL DROP"),
(r'\bDELETE\s+FROM\b(?![^\n]*\bWHERE\b)', "SQL DELETE without WHERE"),
(r'\bTRUNCATE\s+(TABLE)?\s*\w', "SQL TRUNCATE"),
(rf'>\s*{_SYSTEM_CONFIG_PATH}', "overwrite system config"),
(r'\bsystemctl\s+(-[^\s]+\s+)*(stop|restart|disable|mask)\b', "stop/restart system service"),
(r'\bkill\s+-9\s+-1\b', "kill all processes"),
(r'\bpkill\s+-9\b', "force kill processes"),
(r'\bkillall\s+(-[^\s]*\s+)*-(9|KILL|SIGKILL)\b', "force kill processes (killall -KILL)"),
(r'\bkillall\s+(-[^\s]*\s+)*-s\s+(KILL|SIGKILL|9)\b', "force kill processes (killall -s KILL)"),
(r'\bkillall\s+(-[^\s]*\s+)*-r\b', "kill processes by regex (killall -r)"),
(r':\(\)\s*\{\s*:\s*\|\s*:\s*&\s*\}\s*;\s*:', "fork bomb"),
(r'\b(bash|sh|zsh|ksh)\s+-[^\s]*c(\s+|$)', "shell command via -c/-lc flag"),
(r'\b(python[23]?|perl|ruby|node)\s+-[ec]\s+', "script execution via -e/-c flag"),
(r'\b(curl|wget)\b.*\|\s*(ba)?sh\b', "pipe remote content to shell"),
(r'\b(bash|sh|zsh|ksh)\s+<\s*<?\s*\(\s*(curl|wget)\b', "execute remote script via process substitution"),
(rf'\btee\b.*["\']?{_SENSITIVE_WRITE_TARGET}', "overwrite system file via tee"),
(rf'>>?\s*["\']?{_SENSITIVE_WRITE_TARGET}', "overwrite system file via redirection"),
(rf'\btee\b.*["\']?{_PROJECT_SENSITIVE_WRITE_TARGET}["\']?{_COMMAND_TAIL}', "overwrite project env/config via tee"),
(rf'>>?\s*["\']?{_PROJECT_SENSITIVE_WRITE_TARGET}["\']?{_COMMAND_TAIL}', "overwrite project env/config via redirection"),
(r'\bxargs\s+.*\brm\b', "xargs with rm"),
(r'\bfind\b.*-exec(?:dir)?\s+(/\S*/)?rm\b', "find -exec/-execdir rm"),
(r'\bfind\b.*-delete\b', "find -delete"),
(r'\bhermes\s+gateway\s+(stop|restart)\b', "stop/restart hermes gateway (kills running agents)"),
(r'\bhermes\s+update\b', "hermes update (restarts gateway, kills running agents)"),
(r'gateway\s+run\b.*(&\s*$|&\s*;|\bdisown\b|\bsetsid\b)', "start gateway outside systemd (use 'systemctl --user restart hermes-gateway')"),
(r'\bnohup\b.*gateway\s+run\b', "start gateway outside systemd (use 'systemctl --user restart hermes-gateway')"),
(r'\b(pkill|killall)\b.*\b(hermes|gateway|cli\.py)\b', "kill hermes/gateway process (self-termination)"),
(r'\bkill\b.*\$\(\s*pgrep\b', "kill process via pgrep expansion (self-termination)"),
(r'\bkill\b.*`\s*pgrep\b', "kill process via backtick pgrep expansion (self-termination)"),
(rf'\b(cp|mv|install)\b.*\s{_SYSTEM_CONFIG_PATH}', "copy/move file into system config path"),
(rf'\b(cp|mv|install)\b.*\s["\']?{_PROJECT_SENSITIVE_WRITE_TARGET}["\']?{_COMMAND_TAIL}', "overwrite project env/config file"),
(rf'\bsed\s+-[^\s]*i.*\s{_SYSTEM_CONFIG_PATH}', "in-place edit of system config"),
(rf'\bsed\s+--in-place\b.*\s{_SYSTEM_CONFIG_PATH}', "in-place edit of system config (long flag)"),
(r'\b(python[23]?|perl|ruby|node)\s+<<', "script execution via heredoc"),
(r'\bgit\s+reset\s+--hard\b', "git reset --hard (destroys uncommitted changes)"),
(r'\bgit\s+push\b.*--force\b', "git force push (rewrites remote history)"),
(r'\bgit\s+push\b.*-f\b', "git force push short flag (rewrites remote history)"),
(r'\bgit\s+clean\s+-[^\s]*f', "git clean with force (deletes untracked files)"),
(r'\bgit\s+branch\s+-D\b', "git branch force delete"),
(r'\bchmod\s+\+x\b.*[;&|]+\s*\./', "chmod +x followed by immediate execution"),
(r'\bsudo\b[^;|&\n]*?\s+(?:-s\b|--stdin\b|-a\b|--askpass\b)',
"sudo with privilege flag (stdin/askpass/shell/list)"),
(r'\bsudo\b[^;|&\n]*?\s+-[a-z]*[sa][a-z]*\b',
"sudo with combined-flag privilege escalation"),
]
DANGEROUS_PATTERNS_COMPILED = [
(re.compile(pattern, _RE_FLAGS), description)
for pattern, description in DANGEROUS_PATTERNS
]
def _legacy_pattern_key(pattern: str) -> str:
"""Reproduce the old regex-derived approval key for backwards compatibility."""
return pattern.split(r'\b')[1] if r'\b' in pattern else pattern[:20]
_PATTERN_KEY_ALIASES: dict[str, set[str]] = {}
for _pattern, _description in DANGEROUS_PATTERNS:
_legacy_key = _legacy_pattern_key(_pattern)
_canonical_key = _description
_PATTERN_KEY_ALIASES.setdefault(_canonical_key, set()).update({_canonical_key, _legacy_key})
_PATTERN_KEY_ALIASES.setdefault(_legacy_key, set()).update({_legacy_key, _canonical_key})
def _approval_key_aliases(pattern_key: str) -> set[str]:
"""Return all approval keys that should match this pattern.
New approvals use the human-readable description string, but older
command_allowlist entries and session approvals may still contain the
historical regex-derived key.
"""
return _PATTERN_KEY_ALIASES.get(pattern_key, {pattern_key})
def _normalize_command_for_detection(command: str) -> str:
"""Normalize a command string before dangerous-pattern matching.
Strips ANSI escape sequences (full ECMA-48 via tools.ansi_strip),
null bytes, and normalizes Unicode fullwidth characters so that
obfuscation techniques cannot bypass the pattern-based detection.
"""
from tools.ansi_strip import strip_ansi
command = strip_ansi(command)
command = command.replace('\x00', '')
command = unicodedata.normalize('NFKC', command)
return command
def detect_dangerous_command(command: str) -> tuple:
"""Check if a command matches any dangerous patterns.
Returns:
(is_dangerous, pattern_key, description) or (False, None, None)
"""
command_lower = _normalize_command_for_detection(command).lower()
for pattern_re, description in DANGEROUS_PATTERNS_COMPILED:
if pattern_re.search(command_lower):
pattern_key = description
return (True, pattern_key, description)
return (False, None, None)
_lock = threading.Lock()
_pending: dict[str, dict] = {}
_session_approved: dict[str, set] = {}
_session_yolo: set[str] = set()
_permanent_approved: set = set()
class _ApprovalEntry:
"""One pending dangerous-command approval inside a gateway session."""
__slots__ = ("event", "data", "result")
def __init__(self, data: dict):
self.event = threading.Event()
self.data = data
self.result: Optional[str] = None
_gateway_queues: dict[str, list] = {}
_gateway_notify_cbs: dict[str, object] = {}
def register_gateway_notify(session_key: str, cb) -> None:
"""Register a per-session callback for sending approval requests to the user.
The callback signature is ``cb(approval_data: dict) -> None`` where
*approval_data* contains ``command``, ``description``, and
``pattern_keys``. The callback bridges sync→async (runs in the agent
thread, must schedule the actual send on the event loop).
"""
with _lock:
_gateway_notify_cbs[session_key] = cb
def unregister_gateway_notify(session_key: str) -> None:
"""Unregister the per-session gateway approval callback.
Signals ALL blocked threads for this session so they don't hang forever
(e.g. when the agent run finishes or is interrupted).
"""
with _lock:
_gateway_notify_cbs.pop(session_key, None)
entries = _gateway_queues.pop(session_key, [])
for entry in entries:
entry.event.set()
def resolve_gateway_approval(session_key: str, choice: str,
resolve_all: bool = False) -> int:
"""Called by the gateway's /approve or /deny handler to unblock
waiting agent thread(s).
When *resolve_all* is True every pending approval in the session is
resolved at once (``/approve all``). Otherwise only the oldest one
is resolved (FIFO).
Returns the number of approvals resolved (0 means nothing was pending).
"""
with _lock:
queue = _gateway_queues.get(session_key)
if not queue:
return 0
if resolve_all:
targets = list(queue)
queue.clear()
else:
targets = [queue.pop(0)]
if not queue:
_gateway_queues.pop(session_key, None)
for entry in targets:
entry.result = choice
entry.event.set()
return len(targets)
def has_blocking_approval(session_key: str) -> bool:
"""Check if a session has one or more blocking gateway approvals waiting."""
with _lock:
return bool(_gateway_queues.get(session_key))
def submit_pending(session_key: str, approval: dict):
"""Store a pending approval request for a session."""
with _lock:
_pending[session_key] = approval
def approve_session(session_key: str, pattern_key: str):
"""Approve a pattern for this session only."""
with _lock:
_session_approved.setdefault(session_key, set()).add(pattern_key)
def enable_session_yolo(session_key: str) -> None:
"""Enable YOLO bypass for a single session key."""
if not session_key:
return
with _lock:
_session_yolo.add(session_key)
def disable_session_yolo(session_key: str) -> None:
"""Disable YOLO bypass for a single session key."""
if not session_key:
return
with _lock:
_session_yolo.discard(session_key)
def clear_session(session_key: str) -> None:
"""Remove all approval and yolo state for a given session."""
if not session_key:
return
with _lock:
_session_approved.pop(session_key, None)
_session_yolo.discard(session_key)
_pending.pop(session_key, None)
entries = _gateway_queues.pop(session_key, [])
for entry in entries:
entry.result = "deny"
entry.event.set()
def is_session_yolo_enabled(session_key: str) -> bool:
"""Return True when YOLO bypass is enabled for a specific session."""
if not session_key:
return False
with _lock:
return session_key in _session_yolo
def is_current_session_yolo_enabled() -> bool:
"""Return True when the active approval session has YOLO bypass enabled."""
return is_session_yolo_enabled(get_current_session_key(default=""))
def is_approved(session_key: str, pattern_key: str) -> bool:
"""Check if a pattern is approved (session-scoped or permanent).
Accept both the current canonical key and the legacy regex-derived key so
existing command_allowlist entries continue to work after key migrations.
"""
aliases = _approval_key_aliases(pattern_key)
with _lock:
if any(alias in _permanent_approved for alias in aliases):
return True
session_approvals = _session_approved.get(session_key, set())
return any(alias in session_approvals for alias in aliases)
def approve_permanent(pattern_key: str):
"""Add a pattern to the permanent allowlist."""
with _lock:
_permanent_approved.add(pattern_key)
def load_permanent(patterns: set):
"""Bulk-load permanent allowlist entries from config."""
with _lock:
_permanent_approved.update(patterns)
def load_permanent_allowlist() -> set:
"""Load permanently allowed command patterns from config.
Also syncs them into the approval module so is_approved() works for
patterns added via 'always' in a previous session.
"""
try:
from hermes_cli.config import load_config
config = load_config()
patterns = set(config.get("command_allowlist", []) or [])
if patterns:
load_permanent(patterns)
return patterns
except Exception as e:
logger.warning("Failed to load permanent allowlist: %s", e)
return set()
def save_permanent_allowlist(patterns: set):
"""Save permanently allowed command patterns to config."""
try:
from hermes_cli.config import load_config, save_config
config = load_config()
config["command_allowlist"] = list(patterns)
save_config(config)
except Exception as e:
logger.warning("Could not save allowlist: %s", e)
def prompt_dangerous_approval(command: str, description: str,
timeout_seconds: int | None = None,
allow_permanent: bool = True,
approval_callback=None) -> str:
"""Prompt the user to approve a dangerous command (CLI only).
Args:
allow_permanent: When False, hide the [a]lways option (used when
tirith warnings are present, since broad permanent allowlisting
is inappropriate for content-level security findings).
approval_callback: Optional callback registered by the CLI for
prompt_toolkit integration. Signature:
(command, description, *, allow_permanent=True) -> str.
Returns: 'once', 'session', 'always', or 'deny'
"""
if timeout_seconds is None:
timeout_seconds = _get_approval_timeout()
if approval_callback is not None:
try:
return approval_callback(command, description,
allow_permanent=allow_permanent)
except Exception as e:
logger.error("Approval callback failed: %s", e, exc_info=True)
return "deny"
try:
from prompt_toolkit.application.current import get_app_or_none
if get_app_or_none() is not None:
logger.warning(
"Dangerous-command approval requested on a thread with no "
"approval callback while prompt_toolkit is active; denying "
"to avoid stdin deadlock. command=%r description=%r",
command, description,
)
return "deny"
except Exception:
pass
os.environ["HERMES_SPINNER_PAUSE"] = "1"
try:
from agent.i18n import t
while True:
print()
print(f" {t('approval.dangerous_header', description=description)}")
print(f" {command}")
print()
if allow_permanent:
print(t("approval.choose_long"))
else:
print(t("approval.choose_short"))
print()
sys.stdout.flush()
result = {"choice": ""}
def get_input():
try:
prompt = t("approval.prompt_long") if allow_permanent else t("approval.prompt_short")
result["choice"] = input(prompt).strip().lower()
except (EOFError, OSError):
result["choice"] = ""
thread = threading.Thread(target=get_input, daemon=True)
thread.start()
thread.join(timeout=timeout_seconds)
if thread.is_alive():
print("\n" + t("approval.timeout"))
return "deny"
choice = result["choice"]
if choice in {'o', 'once'}:
print(t("approval.allowed_once"))
return "once"
elif choice in {'s', 'session'}:
print(t("approval.allowed_session"))
return "session"
elif choice in {'a', 'always'}:
if not allow_permanent:
print(t("approval.allowed_session"))
return "session"
print(t("approval.allowed_always"))
return "always"
else:
print(t("approval.denied"))
return "deny"
except (EOFError, KeyboardInterrupt):
print("\n" + t("approval.cancelled"))
return "deny"
finally:
if "HERMES_SPINNER_PAUSE" in os.environ:
del os.environ["HERMES_SPINNER_PAUSE"]
print()
sys.stdout.flush()
def _normalize_approval_mode(mode) -> str:
"""Normalize approval mode values loaded from YAML/config.
YAML 1.1 treats bare words like `off` as booleans, so a config entry like
`approvals:\n mode: off` is parsed as False unless quoted. Treat that as the
intended string mode instead of falling back to manual approvals.
"""
if isinstance(mode, bool):
return "off" if mode is False else "manual"
if isinstance(mode, str):
normalized = mode.strip().lower()
return normalized or "manual"
return "manual"
def _get_approval_config() -> dict:
"""Read the approvals config block. Returns a dict with 'mode', 'timeout', etc."""
try:
from hermes_cli.config import load_config
config = load_config()
return config.get("approvals", {}) or {}
except Exception as e:
logger.warning("Failed to load approval config: %s", e)
return {}
def _get_approval_mode() -> str:
"""Read the approval mode from config. Returns 'manual', 'smart', or 'off'."""
mode = _get_approval_config().get("mode", "manual")
return _normalize_approval_mode(mode)
def _get_approval_timeout() -> int:
"""Read the approval timeout from config. Defaults to 60 seconds."""
try:
return int(_get_approval_config().get("timeout", 60))
except (ValueError, TypeError):
return 60
def _get_cron_approval_mode() -> str:
"""Read the cron approval mode from config. Returns 'deny' or 'approve'."""
try:
from hermes_cli.config import load_config
config = load_config()
mode = str(cfg_get(config, "approvals", "cron_mode", default="deny")).lower().strip()
if mode in {"approve", "off", "allow", "yes"}:
return "approve"
return "deny"
except Exception:
return "deny"
def _smart_approve(command: str, description: str) -> str:
"""Use the auxiliary LLM to assess risk and decide approval.
Returns 'approve' if the LLM determines the command is safe,
'deny' if genuinely dangerous, or 'escalate' if uncertain.
Inspired by OpenAI Codex's Smart Approvals guardian subagent
(openai/codex#13860).
"""
try:
from agent.auxiliary_client import call_llm
prompt = f"""You are a security reviewer for an AI coding agent. A terminal command was flagged by pattern matching as potentially dangerous.
Command: {command}
Flagged reason: {description}
Assess the ACTUAL risk of this command. Many flagged commands are false positives — for example, `python -c "print('hello')"` is flagged as "script execution via -c flag" but is completely harmless.
Rules:
- APPROVE if the command is clearly safe (benign script execution, safe file operations, development tools, package installs, git operations, etc.)
- DENY if the command could genuinely damage the system (recursive delete of important paths, overwriting system files, fork bombs, wiping disks, dropping databases, etc.)
- ESCALATE if you're uncertain
Respond with exactly one word: APPROVE, DENY, or ESCALATE"""
response = call_llm(
task="approval",
messages=[{"role": "user", "content": prompt}],
temperature=0,
max_tokens=16,
)
answer = (response.choices[0].message.content or "").strip().upper()
if "APPROVE" in answer:
return "approve"
elif "DENY" in answer:
return "deny"
else:
return "escalate"
except Exception as e:
logger.debug("Smart approvals: LLM call failed (%s), escalating", e)
return "escalate"
def check_dangerous_command(command: str, env_type: str,
approval_callback=None) -> dict:
"""Check if a command is dangerous and handle approval.
This is the main entry point called by terminal_tool before executing
any command. It orchestrates detection, session checks, and prompting.
Args:
command: The shell command to check.
env_type: Terminal backend type ('local', 'ssh', 'docker', etc.).
approval_callback: Optional CLI callback for interactive prompts.
Returns:
{"approved": True/False, "message": str or None, ...}
"""
if env_type in {"docker", "singularity", "modal", "daytona", "vercel_sandbox"}:
return {"approved": True, "message": None}
is_hardline, hardline_desc = detect_hardline_command(command)
if is_hardline:
logger.warning("Hardline block: %s (command: %s)", hardline_desc, command[:200])
return _hardline_block_result(hardline_desc)
if is_truthy_value(os.getenv("HERMES_YOLO_MODE")) or is_current_session_yolo_enabled():
return {"approved": True, "message": None}
is_dangerous, pattern_key, description = detect_dangerous_command(command)
if not is_dangerous:
return {"approved": True, "message": None}
session_key = get_current_session_key()
if is_approved(session_key, pattern_key):
return {"approved": True, "message": None}
is_cli = env_var_enabled("HERMES_INTERACTIVE")
is_gateway = _is_gateway_approval_context()
if not is_cli and not is_gateway:
if env_var_enabled("HERMES_CRON_SESSION"):
if _get_cron_approval_mode() == "deny":
return {
"approved": False,
"message": (
f"BLOCKED: Command flagged as dangerous ({description}) "
"but cron jobs run without a user present to approve it. "
"Find an alternative approach that avoids this command. "
"To allow dangerous commands in cron jobs, set "
"approvals.cron_mode: approve in config.yaml."
),
}
return {"approved": True, "message": None}
if is_gateway or env_var_enabled("HERMES_EXEC_ASK"):
submit_pending(session_key, {
"command": command,
"pattern_key": pattern_key,
"description": description,
})
return {
"approved": False,
"pattern_key": pattern_key,
"status": "approval_required",
"command": command,
"description": description,
"message": (
f"⚠️ This command is potentially dangerous ({description}). "
f"Asking the user for approval.\n\n**Command:**\n```\n{command}\n```"
),
}
choice = prompt_dangerous_approval(command, description,
approval_callback=approval_callback)
if choice == "deny":
return {
"approved": False,
"message": f"BLOCKED: User denied this potentially dangerous command (matched '{description}' pattern). Do NOT retry this command - the user has explicitly rejected it.",
"pattern_key": pattern_key,
"description": description,
}
if choice == "session":
approve_session(session_key, pattern_key)
elif choice == "always":
approve_session(session_key, pattern_key)
approve_permanent(pattern_key)
save_permanent_allowlist(_permanent_approved)
return {"approved": True, "message": None}
def _format_tirith_description(tirith_result: dict) -> str:
"""Build a human-readable description from tirith findings.
Includes severity, title, and description for each finding so users
can make an informed approval decision.
"""
findings = tirith_result.get("findings") or []
if not findings:
summary = tirith_result.get("summary") or "security issue detected"
return f"Security scan: {summary}"
parts = []
for f in findings:
severity = f.get("severity", "")
title = f.get("title", "")
desc = f.get("description", "")
if title and desc:
parts.append(f"[{severity}] {title}: {desc}" if severity else f"{title}: {desc}")
elif title:
parts.append(f"[{severity}] {title}" if severity else title)
if not parts:
summary = tirith_result.get("summary") or "security issue detected"
return f"Security scan: {summary}"
return "Security scan — " + "; ".join(parts)
def check_all_command_guards(command: str, env_type: str,
approval_callback=None) -> dict:
"""Run all pre-exec security checks and return a single approval decision.
Gathers findings from tirith and dangerous-command detection, then
presents them as a single combined approval request. This prevents
a gateway force=True replay from bypassing one check when only the
other was shown to the user.
"""
if env_type in {"docker", "singularity", "modal", "daytona", "vercel_sandbox"}:
return {"approved": True, "message": None}
is_hardline, hardline_desc = detect_hardline_command(command)
if is_hardline:
logger.warning("Hardline block: %s (command: %s)", hardline_desc, command[:200])
return _hardline_block_result(hardline_desc)
is_sudo_guess, sudo_guess_desc = _check_sudo_stdin_guard(command)
if is_sudo_guess:
logger.warning("Sudo stdin guard block: %s (command: %s)",
sudo_guess_desc, command[:200])
return _sudo_stdin_block_result(sudo_guess_desc)
approval_mode = _get_approval_mode()
if is_truthy_value(os.getenv("HERMES_YOLO_MODE")) or is_current_session_yolo_enabled() or approval_mode == "off":
return {"approved": True, "message": None}
is_cli = env_var_enabled("HERMES_INTERACTIVE")
is_gateway = _is_gateway_approval_context()
is_ask = env_var_enabled("HERMES_EXEC_ASK")
if not is_cli and not is_gateway and not is_ask:
if env_var_enabled("HERMES_CRON_SESSION"):
if _get_cron_approval_mode() == "deny":
is_dangerous, _pk, description = detect_dangerous_command(command)
if is_dangerous:
return {
"approved": False,
"message": (
f"BLOCKED: Command flagged as dangerous ({description}) "
"but cron jobs run without a user present to approve it. "
"Find an alternative approach that avoids this command. "
"To allow dangerous commands in cron jobs, set "
"approvals.cron_mode: approve in config.yaml."
),
}
return {"approved": True, "message": None}
tirith_result = {"action": "allow", "findings": [], "summary": ""}
try:
from tools.tirith_security import check_command_security
tirith_result = check_command_security(command)
except ImportError:
pass
is_dangerous, pattern_key, description = detect_dangerous_command(command)
warnings = []
session_key = get_current_session_key()
if tirith_result["action"] in {"block", "warn"}:
findings = tirith_result.get("findings") or []
rule_id = findings[0].get("rule_id", "unknown") if findings else "unknown"
tirith_key = f"tirith:{rule_id}"
tirith_desc = _format_tirith_description(tirith_result)
if not is_approved(session_key, tirith_key):
warnings.append((tirith_key, tirith_desc, True))
if is_dangerous:
if not is_approved(session_key, pattern_key):
warnings.append((pattern_key, description, False))
if not warnings:
return {"approved": True, "message": None}
if approval_mode == "smart":
combined_desc_for_llm = "; ".join(desc for _, desc, _ in warnings)
verdict = _smart_approve(command, combined_desc_for_llm)
if verdict == "approve":
for key, _, _ in warnings:
approve_session(session_key, key)
logger.debug("Smart approval: auto-approved '%s' (%s)",
command[:60], combined_desc_for_llm)
return {"approved": True, "message": None,
"smart_approved": True,
"description": combined_desc_for_llm}
elif verdict == "deny":
combined_desc_for_llm = "; ".join(desc for _, desc, _ in warnings)
return {
"approved": False,
"message": f"BLOCKED by smart approval: {combined_desc_for_llm}. "
"The command was assessed as genuinely dangerous. Do NOT retry.",
"smart_denied": True,
}
combined_desc = "; ".join(desc for _, desc, _ in warnings)
primary_key = warnings[0][0]
all_keys = [key for key, _, _ in warnings]
has_tirith = any(is_t for _, _, is_t in warnings)
if is_gateway or is_ask:
notify_cb = None
with _lock:
notify_cb = _gateway_notify_cbs.get(session_key)
if notify_cb is not None:
approval_data = {
"command": command,
"pattern_key": primary_key,
"pattern_keys": all_keys,
"description": combined_desc,
}
entry = _ApprovalEntry(approval_data)
with _lock:
_gateway_queues.setdefault(session_key, []).append(entry)
_fire_approval_hook(
"pre_approval_request",
command=command,
description=combined_desc,
pattern_key=primary_key,
pattern_keys=list(all_keys),
session_key=session_key,
surface="gateway",
)
try:
notify_cb(approval_data)
except Exception as exc:
logger.warning("Gateway approval notify failed: %s", exc)
with _lock:
queue = _gateway_queues.get(session_key, [])
if entry in queue:
queue.remove(entry)
if not queue:
_gateway_queues.pop(session_key, None)
return {
"approved": False,
"message": "BLOCKED: Failed to send approval request to user. Do NOT retry.",
"pattern_key": primary_key,
"description": combined_desc,
}
timeout = _get_approval_config().get("gateway_timeout", 300)
try:
timeout = int(timeout)
except (ValueError, TypeError):
timeout = 300
try:
from tools.environments.base import touch_activity_if_due
except Exception:
touch_activity_if_due = None
_now = time.monotonic()
_deadline = _now + max(timeout, 0)
_activity_state = {"last_touch": _now, "start": _now}
resolved = False
while True:
_remaining = _deadline - time.monotonic()
if _remaining <= 0:
break
if entry.event.wait(timeout=min(1.0, _remaining)):
resolved = True
break
if touch_activity_if_due is not None:
touch_activity_if_due(
_activity_state, "waiting for user approval"
)
with _lock:
queue = _gateway_queues.get(session_key, [])
if entry in queue:
queue.remove(entry)
if not queue:
_gateway_queues.pop(session_key, None)
choice = entry.result
_outcome = (
"timeout" if not resolved
else (choice if choice else "timeout")
)
_fire_approval_hook(
"post_approval_response",
command=command,
description=combined_desc,
pattern_key=primary_key,
pattern_keys=list(all_keys),
session_key=session_key,
surface="gateway",
choice=_outcome,
)
if not resolved or choice is None or choice == "deny":
reason = "timed out" if not resolved else "denied by user"
return {
"approved": False,
"message": f"BLOCKED: Command {reason}. Do NOT retry this command.",
"pattern_key": primary_key,
"description": combined_desc,
}
for key, _, is_tirith in warnings:
if choice == "session" or (choice == "always" and is_tirith):
approve_session(session_key, key)
elif choice == "always":
approve_session(session_key, key)
approve_permanent(key)
save_permanent_allowlist(_permanent_approved)
return {"approved": True, "message": None,
"user_approved": True, "description": combined_desc}
submit_pending(session_key, {
"command": command,
"pattern_key": primary_key,
"pattern_keys": all_keys,
"description": combined_desc,
})
return {
"approved": False,
"pattern_key": primary_key,
"status": "pending_approval",
"approval_pending": True,
"command": command,
"description": combined_desc,
"message": (
f"⚠️ {combined_desc}. Asking the user for approval.\n\n**Command:**\n```\n{command}\n```"
),
}
_fire_approval_hook(
"pre_approval_request",
command=command,
description=combined_desc,
pattern_key=primary_key,
pattern_keys=list(all_keys),
session_key=session_key,
surface="cli",
)
choice = prompt_dangerous_approval(command, combined_desc,
allow_permanent=not has_tirith,
approval_callback=approval_callback)
_fire_approval_hook(
"post_approval_response",
command=command,
description=combined_desc,
pattern_key=primary_key,
pattern_keys=list(all_keys),
session_key=session_key,
surface="cli",
choice=choice,
)
if choice == "deny":
return {
"approved": False,
"message": "BLOCKED: User denied. Do NOT retry.",
"pattern_key": primary_key,
"description": combined_desc,
}
for key, _, is_tirith in warnings:
if choice == "session" or (choice == "always" and is_tirith):
approve_session(session_key, key)
elif choice == "always":
approve_session(session_key, key)
approve_permanent(key)
save_permanent_allowlist(_permanent_approved)
return {"approved": True, "message": None,
"user_approved": True, "description": combined_desc}
load_permanent_allowlist()