"""AI 终端工具:在沙箱工作目录中执行 Shell 命令。"""
from __future__ import annotations
import os
import re
import subprocess
from pathlib import Path
from typing import Any
from app.core.config import (
is_development,
terminal_max_output_chars,
terminal_timeout_seconds,
terminal_tool_enabled,
)
from app.core.paths import DATA_DIR, ensure_data_dirs
TERMINAL_WORKSPACE = DATA_DIR / "terminal_workspace"
_BLOCKED_PATTERNS = [
re.compile(r"(?i)\brm\s+(-[^\s]*\s+)*-?r[^\s]*\s+/\s"),
re.compile(r"(?i)\brm\s+(-[^\s]*\s+)*-?r[^\s]*\s+/\s*$"),
re.compile(r"(?i)\bmkfs\b"),
re.compile(r"(?i)\bdd\s+if="),
re.compile(r"(?i)\b>:?\s*/dev/"),
re.compile(r"(?i)\bshutdown\b"),
re.compile(r"(?i)\breboot\b"),
re.compile(r"(?i)\binit\s+0\b"),
re.compile(r"(?i)\bchmod\s+(-[^\s]*\s+)*777\s+/\s"),
]
def _resolve_cwd(relative: str | None) -> Path:
ensure_data_dirs()
TERMINAL_WORKSPACE.mkdir(parents=True, exist_ok=True)
base = TERMINAL_WORKSPACE.resolve()
if not relative or not str(relative).strip():
return base
rel = Path(str(relative).strip().lstrip("/"))
target = (base / rel).resolve()
if base != target and base not in target.parents:
raise ValueError("工作目录必须在终端沙箱内")
target.mkdir(parents=True, exist_ok=True)
return target
def _check_command(command: str) -> None:
cmd = command.strip()
if not cmd:
raise ValueError("命令不能为空")
if len(cmd) > 8000:
raise ValueError("命令过长")
for pat in _BLOCKED_PATTERNS:
if pat.search(cmd):
raise ValueError("该命令因安全策略被拒绝")
def run_terminal_command(
command: str,
*,
cwd: str | None = None,
timeout_seconds: int | None = None,
) -> dict[str, Any]:
if not terminal_tool_enabled():
raise ValueError("终端工具未启用(设置 COMPILOT_TERMINAL_ENABLED=true)")
_check_command(command)
workdir = _resolve_cwd(cwd)
timeout = min(
max(1, int(timeout_seconds or terminal_timeout_seconds())),
terminal_timeout_seconds(),
)
max_out = terminal_max_output_chars()
env = os.environ.copy()
env["HOME"] = str(workdir)
env["TMPDIR"] = str(workdir / ".tmp")
(workdir / ".tmp").mkdir(parents=True, exist_ok=True)
try:
proc = subprocess.run(
command,
shell=True,
cwd=str(workdir),
capture_output=True,
text=True,
timeout=timeout,
env=env,
)
stdout = proc.stdout or ""
stderr = proc.stderr or ""
truncated = False
combined_len = len(stdout) + len(stderr)
if combined_len > max_out:
truncated = True
budget = max_out
if len(stdout) >= budget:
stdout = stdout[:budget] + "\n…(输出已截断)"
stderr = ""
else:
remain = budget - len(stdout)
stderr = stderr[:remain] + ("\n…(输出已截断)" if len(proc.stderr or "") > remain else "")
return {
"ok": proc.returncode == 0,
"exit_code": proc.returncode,
"cwd": str(workdir),
"command": command,
"stdout": stdout,
"stderr": stderr,
"truncated": truncated,
"timeout_seconds": timeout,
}
except subprocess.TimeoutExpired as exc:
out = (exc.stdout or "") if isinstance(exc.stdout, str) else ""
err = (exc.stderr or "") if isinstance(exc.stderr, str) else ""
return {
"ok": False,
"exit_code": -1,
"cwd": str(workdir),
"command": command,
"stdout": out[:max_out],
"stderr": (err + f"\n命令超时(>{timeout}s)").strip(),
"truncated": len(out) > max_out,
"timed_out": True,
"timeout_seconds": timeout,
}